maven
io.projectreactor reactor-bus
config
@Configurationpublic class EventConfig { @Bean Environment environment() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean @Autowired public EventBus eventBus(Environment environment, MyEventListener myEventListener) { EventBus eventBus = EventBus.create(environment, Environment.THREAD_POOL); eventBus.on($("myevent"), myEventListener); return eventBus; }}
env的默认配置从reactor-core-2.0.8.RELEASE.jar!/META-INF/reactor/reactor-environment.properties中读取
## Copyright (c) 2011-2015 Pivotal Software Inc., Inc. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#### Dispatcher configuration## Each dispatcher must be configured with a type:## reactor.dispatchers..type = ## Legal values for are dispatcherGroup, ringBuffer, synchronous, and workQueue.# Depending on the type, further configuration is be possible:## reactor.dispatchers. .size: dispatcherGroup and workQueue Dispatchers# reactor.dispatchers. .backlog: dispatcherGroup, ringBuffer, and workQueue Dispatchers## A size less than 1 may be specified to indicate that the size should be the same as the number# of CPUs.# A thread pool executor dispatcher, named threadPoolExecutorreactor.dispatchers.threadPoolExecutor.type = threadPoolExecutorreactor.dispatchers.threadPoolExecutor.size = 0# Backlog is how many Task objects to warm up internallyreactor.dispatchers.threadPoolExecutor.backlog = 2048# A group of dispatchers replicated from the default dispatcher, named dispatcherGroupreactor.dispatchers.dispatcherGroup.type = dispatcherGroupreactor.dispatchers.dispatcherGroup.size = 0reactor.dispatchers.dispatcherGroup.backlog = 2048# A ring buffer dispatcher, named ringBufferreactor.dispatchers.shared.type = ringBufferreactor.dispatchers.shared.backlog = 8192# A work queue dispatcher, named workQueuereactor.dispatchers.workQueue.type = workQueuereactor.dispatchers.workQueue.size = 0reactor.dispatchers.workQueue.backlog = 2048# The dispatcher named shared should be the default dispatcherreactor.dispatchers.default = shared
event listener
@Componentpublic class MyEventListener implements Consumer> { private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener.class); @Override public void accept(Event eventContextEvent) { MyEvent event = eventContextEvent.getData(); LOGGER.info("thread {} ,receive event:{}",Thread.currentThread().getName(),event.getData()); }}
publish event
@Autowired EventBus eventBus; public void publishEvent(String data){ eventBus.notify("myevent", Event.wrap(new MyEvent(data))); }
ThreadPoolExecutorDispatcher
reactor-core-2.0.8.RELEASE-sources.jar!/reactor/core/dispatch/ThreadPoolExecutorDispatcher.java
在reactor-core-2.0.8.RELEASE-sources.jar!/reactor/Environment.java创建默认的ThreadPoolExecutorDispatcherprivate static ThreadPoolExecutorDispatcher createThreadPoolExecutorDispatcher(DispatcherConfiguration dispatcherConfiguration) { int size = getSize(dispatcherConfiguration, 0); int backlog = getBacklog(dispatcherConfiguration, 128); return new ThreadPoolExecutorDispatcher(size, backlog, dispatcherConfiguration.getName()); }
构造器
/** * Create a new {@literal ThreadPoolExecutorDispatcher} with the given size, backlog, name, and {@link * java.util.concurrent.RejectedExecutionHandler}. * * @param poolSize * the pool size * @param backlog * the backlog size * @param threadName * the name prefix to use when creating threads */ public ThreadPoolExecutorDispatcher(int poolSize, int backlog, String threadName) { this(poolSize, backlog, threadName, new LinkedBlockingQueue(backlog), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { r.run(); } }); }
默认采用的是LinkedBlockingQueue,大小为配置文件指定的backlog,RejectedExecutionHandler采用的是调用者执行。
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
因此,队列没有满的时候是异步的,满的时候就阻塞了。