[infinispan-commits] Infinispan SVN: r1500 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Feb 11 13:14:12 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-02-11 13:14:11 -0500 (Thu, 11 Feb 2010)
New Revision: 1500
Added:
trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java
Modified:
trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java
trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java
Log:
[ISPN-173] (Build memcached server module) Abstracted the replaying decoder code so that the memcached decoder does not directly depend on Netty any more. Even though hot rod will use a different decoder, this adds an abstraction layer between the decoder used and the real one used underneath.
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java 2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -28,6 +28,7 @@
* @author Galder Zamarreño
* @since 4.0
*/
+ at Deprecated
public interface CommandDecoder {
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java 2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -25,7 +25,7 @@
import java.io.IOException;
/**
- * CommandFactory.
+ * TODO: This only deals with text based protocols, needs further thought. Will be looked into when implementing Hot Rod
*
* @author Galder Zamarreño
* @since 4.0
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public interface Decoder<T extends Enum<T>> {
+ Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, T state) throws Exception;
+ void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception;
+
+ interface Checkpointer<T> {
+ void checkpoint(T state);
+ }
+}
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public interface ExceptionEvent {
+ Throwable getCause();
+}
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport.netty;
+
+import org.infinispan.server.core.transport.ExceptionEvent;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class NettyExceptionEvent implements ExceptionEvent {
+ final org.jboss.netty.channel.ExceptionEvent event;
+
+ NettyExceptionEvent(org.jboss.netty.channel.ExceptionEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public Throwable getCause() {
+ return event.getCause();
+ }
+}
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport.netty;
+
+import org.infinispan.server.core.transport.ChannelBuffer;
+import org.infinispan.server.core.transport.ChannelHandlerContext;
+import org.infinispan.server.core.transport.Decoder;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class NettyReplayingDecoder<T extends Enum<T>> extends ReplayingDecoder<T> implements Decoder.Checkpointer<T> {
+ final Decoder<T> decoder;
+
+ public NettyReplayingDecoder(Decoder<T> decoder, T initialState) {
+ super(initialState, true);
+ this.decoder = decoder;
+ }
+
+ @Override
+ protected Object decode(org.jboss.netty.channel.ChannelHandlerContext nCtx, org.jboss.netty.channel.Channel channel,
+ org.jboss.netty.buffer.ChannelBuffer nBuffer, T state) throws Exception {
+ ChannelHandlerContext ctx = new NettyChannelHandlerContext(nCtx);
+ ChannelBuffer buffer = new NettyChannelBuffer(nBuffer);
+ return decoder.decode(ctx, buffer, state);
+ }
+
+ @Override
+ public void checkpoint(T state) {
+ super.checkpoint(state);
+ }
+
+ @Override
+ public void exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ decoder.exceptionCaught(new NettyChannelHandlerContext(ctx), new NettyExceptionEvent(e));
+ }
+
+ // ReplayingDecoder<T> decoder;
+//
+// public NettyReplayingDecoder(ReplayingDecoder<T> decoder) {
+// this.decoder = decoder;
+// }
+//
+// @Override
+// public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, T state) throws Exception {
+// return decode(((NettyChannelHandlerContext) ctx).ctx, ((NettyChannelHandlerContext) ctx).ctx.getChannel(),
+// ((NettyChannelBuffer) buffer).buffer, state);
+// }
+//
+// @Override
+// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel,
+// org.jboss.netty.buffer.ChannelBuffer buffer, T state) throws Exception {
+// return null;
+// }
+//
+// class AccessorReplayingDecoder<T> extends ReplayingDecoder<T> {
+// ReplayingDecoder<T> decoder;
+//
+// @Override
+// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel, org.jboss.netty.buffer.ChannelBuffer buffer, T state) throws Exception {
+// return null; // TODO: Customise this generated block
+// }
+// }
+
+}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -30,8 +30,9 @@
import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.server.core.Server;
+import org.infinispan.server.core.transport.netty.NettyReplayingDecoder;
import org.infinispan.server.core.transport.netty.NettyServer;
-import org.infinispan.server.memcached.transport.netty.NettyMemcachedDecoder;
+import org.infinispan.server.memcached.transport.MemcachedDecoder;
import org.infinispan.server.core.InterceptorChain;
import org.infinispan.server.memcached.commands.TextCommandHandler;
import org.infinispan.server.memcached.commands.Value;
@@ -82,10 +83,14 @@
public void start() throws Exception {
InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
- NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
+ MemcachedDecoder decoder = new MemcachedDecoder(cache, chain, scheduler);
+ NettyReplayingDecoder nettyDecoder = new NettyReplayingDecoder<MemcachedDecoder.State>(decoder,
+ MemcachedDecoder.State.READ_COMMAND);
+ decoder.setCheckpointer(nettyDecoder);
+
TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
- server = new NettyServer(commandHandler, decoder, new InetSocketAddress(host, port),
+ server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
masterThreads, workerThreads, cache.getName());
server.start();
log.info("Started Memcached text server bound to {0}:{1}", host, port);
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.memcached.transport;
+
+import org.infinispan.Cache;
+import org.infinispan.server.core.Command;
+import org.infinispan.server.core.InterceptorChain;
+import org.infinispan.server.core.transport.Channel;
+import org.infinispan.server.core.transport.ChannelBuffer;
+import org.infinispan.server.core.transport.ChannelBuffers;
+import org.infinispan.server.core.transport.ChannelHandlerContext;
+import org.infinispan.server.core.transport.Decoder;
+import org.infinispan.server.core.transport.ExceptionEvent;
+import org.infinispan.server.memcached.Reply;
+import org.infinispan.server.memcached.UnknownCommandException;
+import org.infinispan.server.memcached.commands.CommandFactory;
+import org.infinispan.server.memcached.commands.StorageCommand;
+import org.infinispan.server.memcached.commands.TextCommand;
+import org.infinispan.server.memcached.commands.Value;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.StreamCorruptedException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.infinispan.server.memcached.TextProtocolUtil.CR;
+import static org.infinispan.server.memcached.TextProtocolUtil.CRLF;
+import static org.infinispan.server.memcached.TextProtocolUtil.LF;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class MemcachedDecoder implements Decoder<MemcachedDecoder.State> {
+ private static final Log log = LogFactory.getLog(MemcachedDecoder.class);
+ private final CommandFactory factory;
+ private volatile TextCommand command;
+ private Decoder.Checkpointer checkpointer;
+
+ public enum State {
+ READ_COMMAND, READ_UNSTRUCTURED_DATA
+ }
+
+ public MemcachedDecoder(Cache<String, Value> cache, InterceptorChain chain, ScheduledExecutorService scheduler) {
+ this.factory = new CommandFactory(cache, chain, scheduler);
+ }
+
+ public void setCheckpointer(Checkpointer checkpointer) {
+ this.checkpointer = checkpointer;
+ }
+
+ @Override
+ public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, State state) throws Exception {
+ switch (state) {
+ case READ_COMMAND:
+ command = factory.createCommand(readLine(buffer));
+ if (command.getType().isStorage())
+ checkpointer.checkpoint(State.READ_UNSTRUCTURED_DATA);
+ else
+ return command;
+ break;
+ case READ_UNSTRUCTURED_DATA:
+ StorageCommand storageCmd = (StorageCommand) command;
+ byte[] data= new byte[storageCmd.getParams().getBytes()];
+ buffer.readBytes(data, 0, data.length);
+ byte next = buffer.readByte();
+ if (next == CR) {
+ next = buffer.readByte();
+ if (next == LF) {
+ try {
+ return reset(storageCmd.setData(data));
+ } catch (IOException ioe) {
+ checkpointer.checkpoint(State.READ_COMMAND);
+ throw ioe;
+ }
+ } else {
+ throw new StreamCorruptedException("Expecting \r\n after data block");
+ }
+ } else {
+ throw new StreamCorruptedException("Expecting \r\n after data block");
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ Throwable t = e.getCause();
+ log.error("Unexpected exception", t);
+ Channel ch = ctx.getChannel();
+ ChannelBuffers buffers = ctx.getChannelBuffers();
+ if (t instanceof UnknownCommandException) {
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(Reply.ERROR.bytes()), buffers.wrappedBuffer(CRLF)));
+ } else if (t instanceof IOException) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(Reply.CLIENT_ERROR).append(' ').append(t);
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(sb.toString().getBytes()), buffers.wrappedBuffer(CRLF)));
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(Reply.SERVER_ERROR).append(' ').append(t);
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(sb.toString().getBytes()), buffers.wrappedBuffer(CRLF)));
+ }
+ }
+
+ private Object reset(Command c) {
+ this.command = null;
+ checkpointer.checkpoint(State.READ_COMMAND);
+ return c;
+ }
+
+ private String readLine(ChannelBuffer buffer) {
+ StringBuilder sb = new StringBuilder(64);
+ int lineLength = 0;
+ while (true) {
+ byte next = buffer.readByte();
+ if (next == CR) {
+ next = buffer.readByte();
+ if (next == LF) {
+ return sb.toString();
+ }
+ } else if (next == LF) {
+ return sb.toString();
+ } else {
+ lineLength++;
+ sb.append((char) next);
+ }
+ }
+ }
+}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java 2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java 2010-02-11 18:14:11 UTC (rev 1500)
@@ -54,6 +54,7 @@
* @author Galder Zamarreño
* @since 4.0
*/
+ at Deprecated
public class NettyMemcachedDecoder extends ReplayingDecoder<NettyMemcachedDecoder.State> {
private static final Log log = LogFactory.getLog(NettyMemcachedDecoder.class);
More information about the infinispan-commits
mailing list