001 /*
002 * Sonar, open source software quality management tool.
003 * Copyright (C) 2008-2012 SonarSource
004 * mailto:contact AT sonarsource DOT com
005 *
006 * Sonar is free software; you can redistribute it and/or
007 * modify it under the terms of the GNU Lesser General Public
008 * License as published by the Free Software Foundation; either
009 * version 3 of the License, or (at your option) any later version.
010 *
011 * Sonar is distributed in the hope that it will be useful,
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014 * Lesser General Public License for more details.
015 *
016 * You should have received a copy of the GNU Lesser General Public
017 * License along with Sonar; if not, write to the Free Software
018 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02
019 */
020 package org.sonar.server.notifications;
021
022 import com.google.common.collect.HashMultimap;
023 import com.google.common.collect.SetMultimap;
024 import com.google.common.collect.Sets;
025 import org.sonar.api.*;
026 import org.sonar.api.config.Settings;
027 import org.sonar.api.notifications.Notification;
028 import org.sonar.api.notifications.NotificationChannel;
029 import org.sonar.api.notifications.NotificationDispatcher;
030 import org.sonar.api.utils.Logs;
031 import org.sonar.api.utils.TimeProfiler;
032 import org.sonar.core.notification.DefaultNotificationManager;
033 import org.sonar.core.notification.NotificationQueueElement;
034
035 import java.util.*;
036 import java.util.concurrent.Executors;
037 import java.util.concurrent.ScheduledExecutorService;
038 import java.util.concurrent.TimeUnit;
039
040 /**
041 * @since 2.10
042 */
043 @org.sonar.api.Properties({
044 @Property(
045 key = NotificationService.PROPERTY_DELAY,
046 defaultValue = "60",
047 name = "Delay of notifications, in seconds",
048 project = false,
049 global = false)
050 })
051 public class NotificationService implements ServerComponent {
052
053 private static final TimeProfiler TIME_PROFILER = new TimeProfiler(Logs.INFO).setLevelToDebug();
054
055 public static final String PROPERTY_DELAY = "sonar.notifications.delay";
056
057 private ScheduledExecutorService executorService;
058 private long delayInSeconds;
059
060 private DefaultNotificationManager manager;
061 private NotificationChannel[] channels;
062 private NotificationDispatcher[] dispatchers;
063
064 private boolean stopping = false;
065
066 /**
067 * Default constructor when no channels.
068 */
069 public NotificationService(Settings settings, DefaultNotificationManager manager, NotificationDispatcher[] dispatchers) {
070 this(settings, manager, dispatchers, new NotificationChannel[0]);
071 Logs.INFO.warn("There is no channels - all notifications would be ignored!");
072 }
073
074 public NotificationService(Settings settings, DefaultNotificationManager manager, NotificationDispatcher[] dispatchers, NotificationChannel[] channels) {
075 delayInSeconds = settings.getLong(PROPERTY_DELAY);
076 this.manager = manager;
077 this.channels = channels;
078 this.dispatchers = dispatchers;
079 }
080
081 public void start() {
082 executorService = Executors.newSingleThreadScheduledExecutor();
083 executorService.scheduleWithFixedDelay(new Runnable() {
084 public void run() {
085 processQueue();
086 }
087 }, 0, delayInSeconds, TimeUnit.SECONDS);
088 Logs.INFO.info("Notification service started (delay {} sec.)", delayInSeconds);
089 }
090
091 public void stop() {
092 try {
093 stopping = true;
094 executorService.awaitTermination(5, TimeUnit.SECONDS);
095 executorService.shutdown();
096 } catch (InterruptedException e) {
097 Logs.INFO.error("Error during stop of notification service", e);
098 }
099 Logs.INFO.info("Notification service stopped");
100 }
101
102 /**
103 * Visibility has been relaxed for tests.
104 */
105 void processQueue() {
106 TIME_PROFILER.start("Processing notifications queue");
107 NotificationQueueElement queueElement = manager.getFromQueue();
108 while (queueElement != null) {
109 deliver(queueElement.getNotification());
110 if (stopping) {
111 break;
112 }
113 queueElement = manager.getFromQueue();
114 }
115 TIME_PROFILER.stop();
116 }
117
118 /**
119 * Visibility has been relaxed for tests.
120 */
121 void deliver(Notification notification) {
122 Logs.INFO.debug("Delivering notification " + notification);
123 SetMultimap<String, NotificationChannel> recipients = HashMultimap.create();
124 for (NotificationChannel channel : channels) {
125 for (NotificationDispatcher dispatcher : dispatchers) {
126 final Set<String> possibleRecipients = Sets.newHashSet();
127 NotificationDispatcher.Context context = new NotificationDispatcher.Context() {
128 public void addUser(String username) {
129 if (username != null) {
130 possibleRecipients.add(username);
131 }
132 }
133 };
134 try {
135 dispatcher.dispatch(notification, context);
136 } catch (Exception e) { // catch all exceptions in order to dispatch using other dispatchers
137 Logs.INFO.warn("Unable to dispatch notification " + notification + " using " + dispatcher, e);
138 }
139 for (String username : possibleRecipients) {
140 if (manager.isEnabled(username, channel.getKey(), dispatcher.getKey())) {
141 recipients.put(username, channel);
142 }
143 }
144 }
145 }
146 for (Map.Entry<String, Collection<NotificationChannel>> entry : recipients.asMap().entrySet()) {
147 String username = entry.getKey();
148 Collection<NotificationChannel> userChannels = entry.getValue();
149 Logs.INFO.debug("For user {} via {}", username, userChannels);
150 for (NotificationChannel channel : userChannels) {
151 try {
152 channel.deliver(notification, username);
153 } catch (Exception e) { // catch all exceptions in order to deliver via other channels
154 Logs.INFO.warn("Unable to deliver notification " + notification + " for user " + username + " via " + channel, e);
155 }
156 }
157 }
158 }
159
160 public List<NotificationDispatcher> getDispatchers() {
161 return Arrays.asList(dispatchers);
162 }
163
164 public List<NotificationChannel> getChannels() {
165 return Arrays.asList(channels);
166 }
167
168 }