[jboss-cvs] JBoss Messaging SVN: r4837 - in trunk/src/main/org/jboss/messaging/core: remoting/impl/wireformat and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 19 15:10:48 EDT 2008
Author: timfox
Date: 2008-08-19 15:10:48 -0400 (Tue, 19 Aug 2008)
New Revision: 4837
Added:
trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java
trunk/src/main/org/jboss/messaging/core/server/CommandManager.java
trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java
Log:
Removed connection
Added: trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ConnectionLifeCycleListener.java 2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.spi.Connection;
+
+/**
+ *
+ * A ConnectionLifeCycleListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ConnectionLifeCycleListener
+{
+ void connectionCreated(Connection connection);
+
+ void connectionDestroyed(Object connectionID);
+
+ void connectionException(Object connectionID, MessagingException me);
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/MessagingBuffer.java 2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,125 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessagingBuffer
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface MessagingBuffer
+{
+ void putByte(byte val);
+
+ void putBytes(byte[] bytes);
+
+ void putBytes(byte[] bytes, int offset, int length);
+
+ void putInt(int val);
+
+ void putInt(int pos, int val);
+
+ void putLong(long val);
+
+ void putShort(short val);
+
+ void putDouble(double val);
+
+ void putFloat(float val);
+
+ void putBoolean(boolean val);
+
+ void putChar(char val);
+
+ void putNullableString(String val);
+
+ void putString(String val);
+
+ void putSimpleString(SimpleString val);
+
+ void putNullableSimpleString(SimpleString val);
+
+ void putUTF(String utf) throws Exception;
+
+ byte getByte();
+
+ short getUnsignedByte();
+
+ void getBytes(byte[] bytes);
+
+ void getBytes(byte[] bytes, int offset, int length);
+
+ int getInt();
+
+ long getLong();
+
+ short getShort();
+
+ int getUnsignedShort();
+
+ double getDouble();
+
+ float getFloat();
+
+ boolean getBoolean();
+
+ char getChar();
+
+ String getString();
+
+ String getNullableString();
+
+ SimpleString getSimpleString();
+
+ SimpleString getNullableSimpleString();
+
+ String getUTF() throws Exception;
+
+ byte[] array();
+
+ int remaining();
+
+ int capacity();
+
+ int limit();
+
+ void limit(int limit);
+
+ void flip();
+
+ void position(int position);
+
+ int position();
+
+ void rewind();
+
+ MessagingBuffer slice();
+
+ MessagingBuffer createNewBuffer(int len);
+
+ Object getUnderlyingBuffer();
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryMessage.java 2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.cluster;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+
+/**
+ *
+ * A ConsumerReplicateDeliveryMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ConsumerReplicateDeliveryMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long messageID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConsumerReplicateDeliveryMessage(final long messageID)
+ {
+ super(REPLICATE_DELIVERY);
+
+ this.messageID = messageID;
+ }
+
+ public ConsumerReplicateDeliveryMessage()
+ {
+ super(REPLICATE_DELIVERY);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putLong(messageID);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ messageID = buffer.getLong();
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", messageID=" + messageID + "]";
+ }
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof ConsumerReplicateDeliveryMessage == false)
+ {
+ return false;
+ }
+
+ ConsumerReplicateDeliveryMessage r = (ConsumerReplicateDeliveryMessage)other;
+
+ return super.equals(other) && this.messageID == r.messageID;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/cluster/ConsumerReplicateDeliveryResponseMessage.java 2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat.cluster;
+
+import org.jboss.messaging.core.remoting.MessagingBuffer;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+
+/**
+ *
+ * A ConsumerReplicateDeliveryResponseMessage
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ConsumerReplicateDeliveryResponseMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long messageID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConsumerReplicateDeliveryResponseMessage(final long messageID)
+ {
+ super(REPLICATE_DELIVERY_RESPONSE);
+
+ this.messageID = messageID;
+ }
+
+ public ConsumerReplicateDeliveryResponseMessage()
+ {
+ super(REPLICATE_DELIVERY_RESPONSE);
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getMessageID()
+ {
+ return messageID;
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putLong(messageID);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ messageID = buffer.getLong();
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", messageID=" + messageID + "]";
+ }
+
+ public boolean equals(Object other)
+ {
+ if (other instanceof ConsumerReplicateDeliveryResponseMessage == false)
+ {
+ return false;
+ }
+
+ ConsumerReplicateDeliveryResponseMessage r = (ConsumerReplicateDeliveryResponseMessage)other;
+
+ return super.equals(other) && this.messageID == r.messageID;
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/server/CommandManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/CommandManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/CommandManager.java 2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.Packet;
+
+/**
+ * A CommandManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface CommandManager
+{
+ Packet sendCommandBlocking(long targetID, Packet packet) throws MessagingException;
+
+ void sendCommandOneway(long targetID, Packet packet);
+
+ void packetProcessed(Packet packet);
+
+ void close();
+}
Added: trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/CommandManagerImpl.java 2008-08-19 19:10:48 UTC (rev 4837)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.server.impl;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.server.CommandManager;
+
+/**
+ * A CommandManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class CommandManagerImpl implements CommandManager, PacketHandler
+{
+ private static final Logger log = Logger.getLogger(CommandManagerImpl.class);
+
+ private final java.util.Queue<Packet> resendCache = new ConcurrentLinkedQueue<Packet>();
+
+ private final int commandBatchSize;
+
+ //These members don't need to be synchronized since only called by one thread concurrently
+ //but do need to be volatile since that could be a different thread each time
+
+ private volatile int lastCommandID = -1;
+
+ private volatile int nextConfirmation;
+
+ private volatile int commandIDSequence = 0;
+
+ private final RemotingConnection remotingConnection;
+
+ private final long sessionTargetID;
+
+ private final long localCommandResponseTargetID;
+
+ private final long remoteCommandResponseTargetID;
+
+ public CommandManagerImpl(final int commandBatchSize, final RemotingConnection remotingConnection,
+ final PacketDispatcher dispatcher,
+ final long sessionTargetID,
+ final long localCommandResponseTargetID,
+ final long remoteCommandResponseTargetID)
+ {
+ this.commandBatchSize = commandBatchSize;
+
+ nextConfirmation = commandBatchSize - 1;
+
+ this.remotingConnection = remotingConnection;
+
+ this.sessionTargetID = sessionTargetID;
+
+ this.localCommandResponseTargetID = localCommandResponseTargetID;
+
+ this.remoteCommandResponseTargetID = remoteCommandResponseTargetID;
+
+ dispatcher.register(this);
+ }
+
+ //Needs to be synchronized since on the server messages can be sent to the client same time as blocking
+ //responses are returned
+ public synchronized Packet sendCommandBlocking(final long targetID, final Packet packet) throws MessagingException
+ {
+ setCommandID(packet);
+
+ Packet response = remotingConnection.sendBlocking(targetID, sessionTargetID, packet, this);
+
+ if (response.getType() == PacketImpl.EXCEPTION)
+ {
+ MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+
+ throw mem.getException();
+ }
+
+ return response;
+ }
+
+ public synchronized void sendCommandOneway(final long targetID, final Packet packet)
+ {
+ setCommandID(packet);
+
+ remotingConnection.sendOneWay(targetID, sessionTargetID, packet);
+ }
+
+ public void packetProcessed(final Packet packet)
+ {
+ long commandID = packet.getCommandID();
+
+ if (commandID != ++lastCommandID)
+ {
+ throw new IllegalStateException("Command id out of sequence, got " + commandID + " expected " + lastCommandID);
+ }
+
+ if (commandID == nextConfirmation)
+ {
+ Packet confirmed = new PacketsConfirmedMessage(lastCommandID);
+
+ confirmed.setTargetID(remoteCommandResponseTargetID);
+
+ nextConfirmation += commandBatchSize;
+
+ remotingConnection.sendOneWay(confirmed);
+ }
+ }
+
+ public void close()
+ {
+ remotingConnection.getPacketDispatcher().unregister(localCommandResponseTargetID);
+ }
+
+ // PacketHandler implementation --------------------------------------------------------------
+
+ public long getID()
+ {
+ return localCommandResponseTargetID;
+ }
+
+ public void handle(final Object connectionID, final Packet m)
+ {
+ PacketsConfirmedMessage msg = (PacketsConfirmedMessage)m;
+
+ Packet packet;
+ do
+ {
+ packet = resendCache.poll();
+ }
+ while (packet.getCommandID() < msg.getCommandID());
+ }
+
+ // Public -----------------------------------------------------------------------------------
+
+ public int getUnconfirmedPackets()
+ {
+ return this.resendCache.size();
+ }
+
+ // Private -----------------------------------------------------------------------------------
+
+ private void setCommandID(final Packet packet)
+ {
+ int commandID = commandIDSequence++;
+
+ packet.setCommandID(commandID);
+
+ resendCache.add(packet);
+ }
+
+}
More information about the jboss-cvs-commits
mailing list