001/*
002 * Sonar, open source software quality management tool.
003 * Copyright (C) 2008-2012 SonarSource
004 * mailto:contact AT sonarsource DOT com
005 *
006 * Sonar is free software; you can redistribute it and/or
007 * modify it under the terms of the GNU Lesser General Public
008 * License as published by the Free Software Foundation; either
009 * version 3 of the License, or (at your option) any later version.
010 *
011 * Sonar is distributed in the hope that it will be useful,
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
014 * Lesser General Public License for more details.
015 *
016 * You should have received a copy of the GNU Lesser General Public
017 * License along with Sonar; if not, write to the Free Software
018 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02
019 */
020package org.sonar.server.notifications;
021
022import com.google.common.collect.HashMultimap;
023import com.google.common.collect.SetMultimap;
024import com.google.common.collect.Sets;
025import org.sonar.api.*;
026import org.sonar.api.config.Settings;
027import org.sonar.api.notifications.Notification;
028import org.sonar.api.notifications.NotificationChannel;
029import org.sonar.api.notifications.NotificationDispatcher;
030import org.sonar.api.utils.Logs;
031import org.sonar.api.utils.TimeProfiler;
032import org.sonar.core.notification.DefaultNotificationManager;
033import org.sonar.core.notification.NotificationQueueElement;
034
035import java.util.*;
036import java.util.concurrent.Executors;
037import java.util.concurrent.ScheduledExecutorService;
038import java.util.concurrent.TimeUnit;
039
040/**
041 * @since 2.10
042 */
043@org.sonar.api.Properties({
044    @Property(
045        key = NotificationService.PROPERTY_DELAY,
046        defaultValue = "60",
047        name = "Delay of notifications, in seconds",
048        project = false,
049        global = false)
050})
051public class NotificationService implements ServerComponent {
052
053  private static final TimeProfiler TIME_PROFILER = new TimeProfiler(Logs.INFO).setLevelToDebug();
054
055  public static final String PROPERTY_DELAY = "sonar.notifications.delay";
056
057  private ScheduledExecutorService executorService;
058  private long delayInSeconds;
059
060  private DefaultNotificationManager manager;
061  private NotificationChannel[] channels;
062  private NotificationDispatcher[] dispatchers;
063
064  private boolean stopping = false;
065
066  /**
067   * Default constructor when no channels.
068   */
069  public NotificationService(Settings settings, DefaultNotificationManager manager, NotificationDispatcher[] dispatchers) {
070    this(settings, manager, dispatchers, new NotificationChannel[0]);
071    Logs.INFO.warn("There is no channels - all notifications would be ignored!");
072  }
073
074  public NotificationService(Settings settings, DefaultNotificationManager manager, NotificationDispatcher[] dispatchers, NotificationChannel[] channels) {
075    delayInSeconds = settings.getLong(PROPERTY_DELAY);
076    this.manager = manager;
077    this.channels = channels;
078    this.dispatchers = dispatchers;
079  }
080
081  public void start() {
082    executorService = Executors.newSingleThreadScheduledExecutor();
083    executorService.scheduleWithFixedDelay(new Runnable() {
084      public void run() {
085        processQueue();
086      }
087    }, 0, delayInSeconds, TimeUnit.SECONDS);
088    Logs.INFO.info("Notification service started (delay {} sec.)", delayInSeconds);
089  }
090
091  public void stop() {
092    try {
093      stopping = true;
094      executorService.awaitTermination(5, TimeUnit.SECONDS);
095      executorService.shutdown();
096    } catch (InterruptedException e) {
097      Logs.INFO.error("Error during stop of notification service", e);
098    }
099    Logs.INFO.info("Notification service stopped");
100  }
101
102  /**
103   * Visibility has been relaxed for tests.
104   */
105  void processQueue() {
106    TIME_PROFILER.start("Processing notifications queue");
107    NotificationQueueElement queueElement = manager.getFromQueue();
108    while (queueElement != null) {
109      deliver(queueElement.getNotification());
110      if (stopping) {
111        break;
112      }
113      queueElement = manager.getFromQueue();
114    }
115    TIME_PROFILER.stop();
116  }
117
118  /**
119   * Visibility has been relaxed for tests.
120   */
121  void deliver(Notification notification) {
122    Logs.INFO.debug("Delivering notification " + notification);
123    SetMultimap<String, NotificationChannel> recipients = HashMultimap.create();
124    for (NotificationChannel channel : channels) {
125      for (NotificationDispatcher dispatcher : dispatchers) {
126        final Set<String> possibleRecipients = Sets.newHashSet();
127        NotificationDispatcher.Context context = new NotificationDispatcher.Context() {
128          public void addUser(String username) {
129            if (username != null) {
130              possibleRecipients.add(username);
131            }
132          }
133        };
134        try {
135          dispatcher.dispatch(notification, context);
136        } catch (Exception e) { // catch all exceptions in order to dispatch using other dispatchers
137          Logs.INFO.warn("Unable to dispatch notification " + notification + " using " + dispatcher, e);
138        }
139        for (String username : possibleRecipients) {
140          if (manager.isEnabled(username, channel.getKey(), dispatcher.getKey())) {
141            recipients.put(username, channel);
142          }
143        }
144      }
145    }
146    for (Map.Entry<String, Collection<NotificationChannel>> entry : recipients.asMap().entrySet()) {
147      String username = entry.getKey();
148      Collection<NotificationChannel> userChannels = entry.getValue();
149      Logs.INFO.debug("For user {} via {}", username, userChannels);
150      for (NotificationChannel channel : userChannels) {
151        try {
152          channel.deliver(notification, username);
153        } catch (Exception e) { // catch all exceptions in order to deliver via other channels
154          Logs.INFO.warn("Unable to deliver notification " + notification + " for user " + username + " via " + channel, e);
155        }
156      }
157    }
158  }
159
160  public List<NotificationDispatcher> getDispatchers() {
161    return Arrays.asList(dispatchers);
162  }
163
164  public List<NotificationChannel> getChannels() {
165    return Arrays.asList(channels);
166  }
167
168}