001 /* 002 * Sonar, open source software quality management tool. 003 * Copyright (C) 2008-2011 SonarSource 004 * mailto:contact AT sonarsource DOT com 005 * 006 * Sonar is free software; you can redistribute it and/or 007 * modify it under the terms of the GNU Lesser General Public 008 * License as published by the Free Software Foundation; either 009 * version 3 of the License, or (at your option) any later version. 010 * 011 * Sonar is distributed in the hope that it will be useful, 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 014 * Lesser General Public License for more details. 015 * 016 * You should have received a copy of the GNU Lesser General Public 017 * License along with Sonar; if not, write to the Free Software 018 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02 019 */ 020 package org.sonar.server.notifications; 021 022 import com.google.common.collect.HashMultimap; 023 import com.google.common.collect.SetMultimap; 024 import com.google.common.collect.Sets; 025 import org.sonar.api.*; 026 import org.sonar.api.config.Settings; 027 import org.sonar.api.notifications.Notification; 028 import org.sonar.api.notifications.NotificationChannel; 029 import org.sonar.api.notifications.NotificationDispatcher; 030 import org.sonar.api.utils.Logs; 031 import org.sonar.api.utils.TimeProfiler; 032 import org.sonar.core.notification.DefaultNotificationManager; 033 import org.sonar.core.notification.NotificationQueueElement; 034 035 import java.util.*; 036 import java.util.concurrent.Executors; 037 import java.util.concurrent.ScheduledExecutorService; 038 import java.util.concurrent.TimeUnit; 039 040 /** 041 * @since 2.10 042 */ 043 @org.sonar.api.Properties({ 044 @Property( 045 key = NotificationService.PROPERTY_DELAY, 046 defaultValue = "60", 047 name = "Delay of notifications, in seconds", 048 project = false, 049 global = false) 050 }) 051 public class NotificationService implements ServerComponent { 052 053 private static final TimeProfiler TIME_PROFILER = new TimeProfiler(Logs.INFO).setLevelToDebug(); 054 055 public static final String PROPERTY_DELAY = "sonar.notifications.delay"; 056 057 private ScheduledExecutorService executorService; 058 private long delayInSeconds; 059 060 private DefaultNotificationManager manager; 061 private NotificationChannel[] channels; 062 private NotificationDispatcher[] dispatchers; 063 064 private boolean stopping = false; 065 066 /** 067 * Default constructor when no channels. 068 */ 069 public NotificationService(Settings settings, DefaultNotificationManager manager, NotificationDispatcher[] dispatchers) { 070 this(settings, manager, dispatchers, new NotificationChannel[0]); 071 Logs.INFO.warn("There is no channels - all notifications would be ignored!"); 072 } 073 074 public NotificationService(Settings settings, DefaultNotificationManager manager, NotificationDispatcher[] dispatchers, NotificationChannel[] channels) { 075 delayInSeconds = settings.getLong(PROPERTY_DELAY); 076 this.manager = manager; 077 this.channels = channels; 078 this.dispatchers = dispatchers; 079 } 080 081 public void start() { 082 executorService = Executors.newSingleThreadScheduledExecutor(); 083 executorService.scheduleWithFixedDelay(new Runnable() { 084 public void run() { 085 processQueue(); 086 } 087 }, 0, delayInSeconds, TimeUnit.SECONDS); 088 Logs.INFO.info("Notification service started (delay {} sec.)", delayInSeconds); 089 } 090 091 public void stop() { 092 try { 093 stopping = true; 094 executorService.awaitTermination(5, TimeUnit.SECONDS); 095 executorService.shutdown(); 096 } catch (InterruptedException e) { 097 Logs.INFO.error("Error during stop of notification service", e); 098 } 099 Logs.INFO.info("Notification service stopped"); 100 } 101 102 /** 103 * Visibility has been relaxed for tests. 104 */ 105 void processQueue() { 106 TIME_PROFILER.start("Processing notifications queue"); 107 NotificationQueueElement queueElement = manager.getFromQueue(); 108 while (queueElement != null) { 109 deliver(queueElement.getNotification()); 110 if (stopping) { 111 break; 112 } 113 queueElement = manager.getFromQueue(); 114 } 115 TIME_PROFILER.stop(); 116 } 117 118 /** 119 * Visibility has been relaxed for tests. 120 */ 121 void deliver(Notification notification) { 122 Logs.INFO.debug("Delivering notification " + notification); 123 SetMultimap<String, NotificationChannel> recipients = HashMultimap.create(); 124 for (NotificationChannel channel : channels) { 125 for (NotificationDispatcher dispatcher : dispatchers) { 126 final Set<String> possibleRecipients = Sets.newHashSet(); 127 NotificationDispatcher.Context context = new NotificationDispatcher.Context() { 128 public void addUser(String username) { 129 if (username != null) { 130 possibleRecipients.add(username); 131 } 132 } 133 }; 134 try { 135 dispatcher.dispatch(notification, context); 136 } catch (Exception e) { // catch all exceptions in order to dispatch using other dispatchers 137 Logs.INFO.warn("Unable to dispatch notification " + notification + " using " + dispatcher, e); 138 } 139 for (String username : possibleRecipients) { 140 if (manager.isEnabled(username, channel.getKey(), dispatcher.getKey())) { 141 recipients.put(username, channel); 142 } 143 } 144 } 145 } 146 for (Map.Entry<String, Collection<NotificationChannel>> entry : recipients.asMap().entrySet()) { 147 String username = entry.getKey(); 148 Collection<NotificationChannel> userChannels = entry.getValue(); 149 Logs.INFO.debug("For user {} via {}", username, userChannels); 150 for (NotificationChannel channel : userChannels) { 151 try { 152 channel.deliver(notification, username); 153 } catch (Exception e) { // catch all exceptions in order to deliver via other channels 154 Logs.INFO.warn("Unable to deliver notification " + notification + " for user " + username + " via " + channel, e); 155 } 156 } 157 } 158 } 159 160 public List<NotificationDispatcher> getDispatchers() { 161 return Arrays.asList(dispatchers); 162 } 163 164 public List<NotificationChannel> getChannels() { 165 return Arrays.asList(channels); 166 } 167 168 }