import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.log4j.Logger; /** * <pre> * 分级事件驱动架构SEDA( Staged E ventDriven A rch itecture)的构造类 * @author kanpiaoxue * @param <Task> SEDAThreadPoolUtils内置的 interface Task * 2013-11-07 * </pre> */ public final class SEDAThreadPool { private static final Logger LOGGER = Logger.getLogger(SEDAThreadPool.class); private final BlockingQueue<SEDAThreadPool.Task> inputQueue; private final BlockingQueue<SEDAThreadPool.Task> outputQueue; private final ExecutorService exec; private final String threadName; private final int threadPoolSize; private final ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap; private volatile boolean hasOutput; private volatile boolean hasConflictMap; /** * <pre> * @param threadName 工作线程的名称 * @param inputQueue 输入任务的队列 * @param outputQueue 输出任务的队列。当不存在输出队列是,该项可以为 null * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool() * @param threadPoolSize 线程池的大小 * </pre> */ public SEDAThreadPool(String threadName, BlockingQueue<Task> inputQueue, BlockingQueue<Task> outputQueue, ExecutorService exec, int threadPoolSize) { this(threadName, inputQueue, outputQueue, exec, threadPoolSize, null); } /** * <pre> * @param threadName 工作线程的名称 * @param inputQueue 输入任务的队列 * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool() * @param threadPoolSize 线程池的大小 * </pre> */ public SEDAThreadPool(String threadName, BlockingQueue<Task> inputQueue, ExecutorService exec, int threadPoolSize) { this(threadName, inputQueue, null, exec, threadPoolSize, null); } /** * <pre> * @param threadName 工作线程的名称 * @param inputQueue 输入任务的队列 * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool() * @param threadPoolSize 线程池的大小 * @param conflictMap Task发生冲突Map :和 conflictMap 中的key存在冲突的任务都不会被执行 * </pre> */ public SEDAThreadPool( String threadName, BlockingQueue<Task> inputQueue, ExecutorService exec, int threadPoolSize, ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap) { this(threadName, inputQueue, null, exec, threadPoolSize, conflictMap); } /** * <pre> * @param threadName 工作线程的名称 * @param inputQueue 输入任务的队列 * @param outputQueue 输出任务的队列。当不存在输出队列是,该项可以为 null * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool() * @param threadPoolSize 线程池的大小 * @param conflictMap Task发生冲突Map :和 conflictMap 中的key存在冲突的任务都不会被执行 * </pre> */ public SEDAThreadPool( String threadName, BlockingQueue<SEDAThreadPool.Task> inputQueue, BlockingQueue<SEDAThreadPool.Task> outputQueue, ExecutorService exec, int threadPoolSize, ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap) { super(); this.threadName = threadName; this.inputQueue = inputQueue; this.outputQueue = outputQueue; this.exec = (null == exec) ? Executors.newCachedThreadPool() : exec; this.threadPoolSize = threadPoolSize; this.conflictMap = conflictMap; if (null != conflictMap) { hasConflictMap = true; } this.hasOutput = (null != outputQueue); } /** * <pre> * 创建SEDA线程池 * </pre> */ public void createSEDAThreadPool() { for (int i = 0; i < threadPoolSize; i++) { exec.execute(new InnerConsumer(threadName + i, inputQueue, outputQueue)); } } private class InnerConsumer implements Runnable { private final String name; private final BlockingQueue<SEDAThreadPool.Task> inputQueue; private final BlockingQueue<SEDAThreadPool.Task> outputQueue; public InnerConsumer(String name, BlockingQueue<SEDAThreadPool.Task> inputQueue, BlockingQueue<SEDAThreadPool.Task> outputQueue) { super(); this.name = name; this.inputQueue = inputQueue; this.outputQueue = outputQueue; } @Override public void run() { Thread.currentThread().setName(name); if (LOGGER.isInfoEnabled()) { LOGGER.info(name + " start to work."); } while (true) { try { SEDAThreadPool.Task inputTask = inputQueue.take(); try { if (hasConflictMap) { if (null != conflictMap.putIfAbsent(inputTask, inputTask)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(inputTask + " has found in conflictMap, it is abandoned."); } continue; } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug(inputTask + " is put into conflictMap."); } } } SEDAThreadPool.Task outputTask = consume(inputTask); if (hasOutput && null != outputTask) { outputQueue.put(outputTask); if (LOGGER.isDebugEnabled()) { LOGGER.debug(outputTask + " put into outputQueue."); } } } finally { if (hasConflictMap && null != conflictMap.remove(inputTask)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(inputTask + " is removed from conflictMap."); } } } } catch (Exception e) { LOGGER.error("Error:" + e.getMessage(), e); } } } private SEDAThreadPool.Task consume(SEDAThreadPool.Task task) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(task + " takes from inputQueue."); } return task.execute() ? task : null; } } /** * <pre> * SEDA的Task接口 * @author kanpiaoxue * </pre> * */ public interface Task { public boolean execute(); } }
import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import com.wanmei.parallel.seda.SEDAThreadPool.Task; public class Test { /** * @param args */ public static void main(String[] args) { BlockingQueue<SEDAThreadPool.Task> inputQueue = new LinkedBlockingQueue<SEDAThreadPool.Task>(); BlockingQueue<SEDAThreadPool.Task> inputQueue2 = new LinkedBlockingQueue<SEDAThreadPool.Task>(); BlockingQueue<SEDAThreadPool.Task> inputQueue3 = new LinkedBlockingQueue<SEDAThreadPool.Task>(); ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap = test(); int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4; ExecutorService exec = Executors.newCachedThreadPool(); SEDAThreadPool pool = new SEDAThreadPool("consumer-1-", inputQueue, inputQueue2, exec, threadPoolSize, conflictMap); pool.createSEDAThreadPool(); SEDAThreadPool pool2 = new SEDAThreadPool("consumer-2-", inputQueue2, inputQueue3, exec, threadPoolSize, conflictMap); pool2.createSEDAThreadPool(); SEDAThreadPool pool3 = new SEDAThreadPool("consumer-3-", inputQueue3, exec, threadPoolSize, conflictMap); pool3.createSEDAThreadPool(); exec.execute(new Test().new Producer("producer", inputQueue)); exec.shutdown(); } private static ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> test() { ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap = new ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task>(); for (int i = 0; i < 10; i++) { Test.TaskImpl t = new Test().new TaskImpl("task-" + i); conflictMap.put(t, t); } return conflictMap; } private class Producer implements Runnable { private final String name; private final BlockingQueue<SEDAThreadPool.Task> outputQueue; public Producer(String name, BlockingQueue<Task> outputQueue) { super(); this.name = name; this.outputQueue = outputQueue; } @Override public void run() { Thread.currentThread().setName(name); while (true) { try { for (SEDAThreadPool.Task t : getTasks()) { outputQueue.put(t); } TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } } } private List<SEDAThreadPool.Task> getTasks() { List<SEDAThreadPool.Task> list = new ArrayList<SEDAThreadPool.Task>(); for (int i = 0; i < 10; i++) { list.add(new Test().new TaskImpl("task-" + i)); } return list; } } private class TaskImpl implements SEDAThreadPool.Task { private String name; public TaskImpl(String name) { super(); this.name = name; } @Override public int hashCode() { final int prime = 37; int result = 17; result = prime * result + getOuterType().hashCode(); result = prime * result + ((name == null) ? 0 : name.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TaskImpl other = (TaskImpl) obj; if (!getOuterType().equals(other.getOuterType())) return false; if (name == null) { if (other.name != null) return false; } else if (!name.equals(other.name)) return false; return true; } @Override public String toString() { return "Task [name=" + name + "]"; } @Override public boolean execute() { try { TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } return true; } private Test getOuterType() { return Test.this; } } }
参考: http://www.eecs.harvard.edu/~mdw/proj/seda/
下面添加的这些代码,只是个人记录一下。代码的正确性有待验证,日期:2017/05/12。
代码基于JDK1.8
/** * <pre> * Copyright baidu.com CDC [2000-2017] * </pre> */ package org.kanpiaoxue.seda; /** * <pre> * SEDAQueue.java * @author kanpiaoxue<br> * @version 1.0 * Create Time 2017年5月12日 下午3:34:24<br> * Description : SEDAQueue接口 * </pre> */ public interface SEDAQueue<E> { /** * <pre> * 放入元素,如果元素已存在则抛弃该元素 * @param e * @throws InterruptedException * </pre> */ void put(E e) throws InterruptedException; /** * <pre> * SEDAQueue 使用元素e开始工作 * @param work SEDAWork实例 * @throws InterruptedException * </pre> */ void work(SEDAWork<E> work) throws InterruptedException; }
/** * <pre> * Copyright baidu.com CDC [2000-2017] * </pre> */ package org.kanpiaoxue.seda; /** * <pre> * SEDAStateMachineService.java * @author kanpiaoxue<br> * @version 1.0 * Create Time 2017年5月12日 下午5:06:36<br> * Description : SEDA状态机服务 * </pre> */ public interface SEDAStateMachineService<E> { /** * <pre> * @param e 任务 * @return 是否正确的状态 * </pre> */ boolean isCorrectState(E e); }
/** * <pre> * Copyright baidu.com CDC [2000-2017] * </pre> */ package org.kanpiaoxue.seda; /** * <pre> * Work.java * @author kanpiaoxue<br> * @version 1.0 * Create Time 2017年5月12日 下午4:45:39<br> * Description : SEDA的work接口 * </pre> */ public interface SEDAWork<E> { /** * <pre> * 使用e开始工作 * @param e 工作元素 * </pre> */ void work(E e); }
/** * <pre> * Copyright baidu.com CDC [2000-2017] * </pre> */ package org.kanpiaoxue.seda; import org.kanpiaoxue.seda.impl.SEDAQueueWithStateMachineImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * <pre> * Test.java * @author kanpiaoxue<br> * @version 1.0 * Create Time 2017年5月12日 下午3:54:39<br> * Description : 测试类 * </pre> */ public class Test { private static final Logger LOGGER = LoggerFactory.getLogger(Test.class); public static void main(String[] args) throws Exception { final SEDAQueue<String> queue = new SEDAQueueWithStateMachineImpl<>(e -> { LOGGER.debug("{} was processed by stateMachine", e); return !e.contains("8"); }); ExecutorService e = Executors.newCachedThreadPool(); e.execute(() -> { Thread.currentThread().setName("Producer-0"); while (true) { try { List<String> list = Lists.newArrayList(); for (int i = 1; i < 10; i++) { String el = "hello" + i; if (i % 7 == 0) { el = "world"; } list.add(el); } for (String el : list) { queue.put(el); LOGGER.info("{} was put into queue", el); } long sleepTime = 10L; LOGGER.info("{} will sleep %s seconds", Thread.currentThread().getName(), sleepTime); TimeUnit.SECONDS.sleep(sleepTime); } catch (Exception ex) { ex.printStackTrace(); } } }); for (int i = 0, j = 4; i < j; i++) { final int num = i; e.execute(() -> { Thread.currentThread().setName("Consumer-" + num); while (true) { try { queue.work(w -> { LOGGER.info("[{}]{} was taken from queue", Thread.currentThread().getName(), w); try { TimeUnit.SECONDS.sleep(1L); } catch (Exception e1) { e1.printStackTrace(); } }); } catch (Exception e1) { e1.printStackTrace(); } } }); } e.shutdown(); e.awaitTermination(2, TimeUnit.DAYS); } }
package org.kanpiaoxue.seda.impl; import org.kanpiaoxue.seda.SEDAQueue; import org.kanpiaoxue.seda.SEDAStateMachineService; import org.kanpiaoxue.seda.SEDAWork; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; /** * <pre> * SEDAQueue.java * @author kanpiaoxue<br> * @version 1.0 * Create Time 2017年5月12日 下午2:55:24<br> * Description : SEDAQueue带有状态机服务的实现类 * </pre> */ public class SEDAQueueWithStateMachineImpl<E> implements SEDAQueue<E> { private static final Logger LOGGER = LoggerFactory.getLogger(SEDAQueueWithStateMachineImpl.class); private static final Object value = new Object(); private final ConcurrentMap<E, Object> outerQueue = new ConcurrentHashMap<>(); private final LinkedBlockingQueue<E> workingQueue = new LinkedBlockingQueue<>(); private final SEDAStateMachineService<E> sedaStateMachineService; /** * <pre> * @param sedaStateMachineService * </pre> */ public SEDAQueueWithStateMachineImpl(SEDAStateMachineService<E> sedaStateMachineService) { super(); Objects.requireNonNull(sedaStateMachineService, "sedaStateMachineService is null"); this.sedaStateMachineService = sedaStateMachineService; } /* * (non-Javadoc) * @see org.kanpiaoxue.seda.SEDAQueue#put(java.lang.Object) */ @Override public void put(E e) throws InterruptedException { Object returnValue = outerQueue.putIfAbsent(e, value); if (null != returnValue) { LOGGER.warn("{} was found in the queue. It was abandoned!", e); return; } boolean isCorrectState = sedaStateMachineService.isCorrectState(e); if (!isCorrectState) { LOGGER.warn("{}'s state is not correct. It was abandoned!", e); outerQueue.remove(e); return; } workingQueue.put(e); } /* * (non-Javadoc) * @see org.kanpiaoxue.seda.SEDAQueue#work(org.kanpiaoxue.seda.Work) */ @Override public void work(SEDAWork<E> work) throws InterruptedException { E e = workingQueue.take(); try { work.work(e); } finally { outerQueue.remove(e); } } }
相关推荐
采用分级事件驱动架构SEDA(Staged Event Driven Architecture),通过划分阶段Stage的方式解除耦合,在阶段之间采用事件进行异步消息通信,结合非阻塞的I/O机制设计实现了一个事件驱动的网格服务容器,并从吞吐量、...
Staged Event Driven Architecture (SEDA) 是加州大学伯克利分校研究的一套优秀的高性能互联网服务器架构模型
阶段事件驱动架构设计论文,详细介绍了seda的架构体系。
Staged Event Driven Architecture (SEDA) 是加州大学伯克利分校研究的一套优秀的高性能互联网服务器架构模型
we call the staged event-driven architecture (SEDA). SEDA is intended to support massive concurrency demands and simplify the construc- tion of well-conditioned services. In SEDA, applications consist...
针对Web服务集成过程中分阶段事件驱动架构(SEDA)仅考虑服务集成架构的资源消耗,而对被集成的服务及由其构成的任务资源耗费考虑不足的问题,提出了分阶段优先级事件驱动架构(SPEDA).选取评价指标,通过熵权法对事件...
NULL 博文链接:https://xylong.iteye.com/blog/1441956
有关 ehensin-seda 的最新信息,请访问我们的网站:
SEDA-分阶段的事件驱动架构 AEDA-基于演员的事件驱动架构 假设ASEDA-基于参与者的阶段性事件驱动架构 DASEDA-基于分布式参与者的分段事件驱动架构 该系统被设计和构建为React流系统,提供具有无阻塞背压的异步流处理...
基于SEDA架构的网格服务容器设计与实现.pdf
SEDA: An Architecture for Scalable,Well-Conditioned Internet ServicesMatt Welsh, David Culler, and Eric BrewerUC Berkeley Computer Science Division mdw@cs.berkeley.eduhttp://www.cs.berkeley.edu/~mdw/...
唤醒Wake是一个事件驱动的框架,基于SEDA,Click,Akka和Rx的思想。 从某种意义上说,它是通用的,旨在支持计算密集型应用程序以及高性能网络,存储和旧版I / O系统。 我们实现了Wake以支持高性能,可扩展的分析处理...
基于SEDA的企业服务总线的设计与设计,清晰地讲解了SEDA的结构与设计原理,为高并发网络响应提供了一种实现思路
【大纲】 服务端软件=排队服务 回顾常见的并发模型 介绍SEDA 分享我们的经验
NULL 博文链接:https://ailikes.iteye.com/blog/2233024
seda过载保护
An Architecture for Highly Concurrent, Well-Conditioned Internet ServicesbyMatthew David WelshB.S. (Cornell University) 1996 M.S. (University of California, Berkeley) 1999A dissertation submitted in ...
基于SEDA的企业服务总线的设计与实现,基于SEDA的企业服务总线的设计与实现
1、安装环境 2、安装依赖软件 3、安装配置Seda 4、测试seda能否正常工作
Aggregate Framework是基于DDD和CQRS思想而开发的一个领域驱动框架。其主要目标是方便开发人员运用DDD和CQRS思想来构建复杂的、可扩展的应用系统。该框架提供了最核心的构建块的实现,比如Aggregate、Repository和...