From 045941d95f113b8665916578aa26721ec3e23e2a Mon Sep 17 00:00:00 2001
From: Lyubomir Marinov <lyubomir.marinov@jitsi.org>
Date: Thu, 29 May 2014 06:45:13 +0300
Subject: [PATCH] Works on dominant speaker detection for the purposes of Jitsi
 Videobridge.

---
 .../neomedia/ActiveSpeakerDetectorImpl.java   | 232 ++++++++++++++++
 .../neomedia/BasicActiveSpeakerDetector.java  | 260 ++++++++++++++++++
 .../csrc/CsrcAudioLevelDispatcher.java        | 183 ++++++++++++
 .../transform/csrc/CsrcTransformEngine.java   | 110 ++------
 .../transform/csrc/SsrcTransformEngine.java   |  32 +++
 .../AbstractActiveSpeakerDetector.java        | 112 ++++++++
 .../neomedia/ActiveSpeakerDetector.java       |  61 ++++
 .../event/ActiveSpeakerChangedListener.java   |  26 ++
 8 files changed, 922 insertions(+), 94 deletions(-)
 create mode 100644 src/org/jitsi/impl/neomedia/ActiveSpeakerDetectorImpl.java
 create mode 100644 src/org/jitsi/impl/neomedia/BasicActiveSpeakerDetector.java
 create mode 100644 src/org/jitsi/impl/neomedia/transform/csrc/CsrcAudioLevelDispatcher.java
 create mode 100644 src/org/jitsi/service/neomedia/AbstractActiveSpeakerDetector.java
 create mode 100644 src/org/jitsi/service/neomedia/ActiveSpeakerDetector.java
 create mode 100644 src/org/jitsi/service/neomedia/event/ActiveSpeakerChangedListener.java

diff --git a/src/org/jitsi/impl/neomedia/ActiveSpeakerDetectorImpl.java b/src/org/jitsi/impl/neomedia/ActiveSpeakerDetectorImpl.java
new file mode 100644
index 00000000..9f2e0663
--- /dev/null
+++ b/src/org/jitsi/impl/neomedia/ActiveSpeakerDetectorImpl.java
@@ -0,0 +1,232 @@
+/*
+ * Jitsi, the OpenSource Java VoIP and Instant Messaging client.
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jitsi.impl.neomedia;
+
+import java.util.*;
+
+import org.jitsi.service.configuration.*;
+import org.jitsi.service.libjitsi.*;
+import org.jitsi.service.neomedia.*;
+import org.jitsi.service.neomedia.event.*;
+
+/**
+ * Implements an {@link ActiveSpeakerDetector} (factory) which uses/delegates to
+ * an actual algorithm implementation for the detections/identification of the
+ * active/dominant speaker in a multipoint conference.
+ *
+ * @author Lyubomir Marinov
+ */
+public class ActiveSpeakerDetectorImpl
+    implements ActiveSpeakerDetector
+{
+    /**
+     * The name of the <tt>ConfigurationService</tt> property which specifies
+     * the class name of the algorithm implementation for the
+     * detection/identification of the active/dominant speaker in a multipoint
+     * conference to be used by <tt>ActiveSpeakerDetectorImpl</tt>. The default
+     * value is <tt>null</tt>. If the specified value is <tt>null</tt> or the
+     * initialization of an instance of the specified class fails,
+     * <tt>ActiveSpeakerDetectorImpl</tt> falls back to a list of well-known
+     * algorithm implementations.
+     */
+    private static final String IMPL_CLASS_NAME_PNAME
+        = ActiveSpeakerDetectorImpl.class.getName() + ".implClassName";
+
+    /**
+     * The names of the classes known by <tt>ActiveSpeakerDetectorImpl</tt> to
+     * implement actual algorithms for the detection/identification of the
+     * active/dominant speaker in a multipoint conference. 
+     */
+    private static final String[] IMPL_CLASS_NAMES
+        = {
+            ".DominantSpeakerIdentification",
+            ".BasicActiveSpeakerDetector"
+        };
+
+    /**
+     * The actual algorithm implementation to be used by this instance for the
+     * detection/identification of the active/dominant speaker in a multipoint
+     * conference.
+     */
+    private final ActiveSpeakerDetector impl;
+
+    /**
+     * Initializes a new <tt>ActiveSpeakerDetectorImpl</tt> which is to use a
+     * default algorithm implementation for the detection/identification of the
+     * active/dominant speaker in a multipoint conference.
+     */
+    public ActiveSpeakerDetectorImpl()
+    {
+        this(getImplClassNames());
+    }
+
+    /**
+     * Initializes a new <tt>ActiveSpeakerDetectorImpl</tt> which is to use the
+     * first available algorithm from a specific list of algorithms (identified
+     * by the names of their implementing classes) for the
+     * detection/identification of the active/dominant speaker in a multipoint
+     * conference.
+     * 
+     * @param implClassNames the class names of the algorithm implementations to
+     * search through and in which the first available is to be found and used
+     * for the detection/identification of the active/dominant speaker in a
+     * multipoint conference
+     * @throws RuntimeException if none of the algorithm implementations
+     * specified by <tt>implClassNames</tt> is available
+     */
+    public ActiveSpeakerDetectorImpl(String... implClassNames)
+    {
+        ActiveSpeakerDetector impl = null;
+        Throwable cause = null;
+
+        for (String implClassName : implClassNames)
+        {
+            try
+            {
+                Class<?> implClass
+                    = Class.forName(normalizeClassName(implClassName));
+
+                if ((implClass != null)
+                        && ActiveSpeakerDetector.class.isAssignableFrom(
+                                implClass))
+                {
+                    impl = (ActiveSpeakerDetector) implClass.newInstance();
+                }
+            }
+            catch (Throwable t)
+            {
+                if (t instanceof InterruptedException)
+                    Thread.currentThread().interrupt();
+                else if (t instanceof ThreadDeath)
+                    throw (ThreadDeath) t;
+                else
+                    cause = t;
+            }
+            if (impl != null)
+                break;
+        }
+        if (impl == null)
+        {
+            throw new RuntimeException(
+                    "Failed to initialize an actual ActiveSpeakerDetector"
+                        + " implementation, tried classes: "
+                        + Arrays.toString(implClassNames),
+                    cause);
+        }
+        else
+        {
+            this.impl = impl;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addActiveSpeakerChangedListener(
+            ActiveSpeakerChangedListener listener)
+    {
+        impl.addActiveSpeakerChangedListener(listener);
+    }
+
+    /**
+     * Gets the names of the classes known by <tt>ActiveSpeakerDetectorImpl</tt>
+     * to implement actual algorithms for the detection/identification of the
+     * active/dominant speaker in a multipoint conference. If the
+     * <tt>ConfigurationService</tt> property {@link #IMPL_CLASS_NAME_PNAME}
+     * specifies a class name, it is prepended to the returned array.
+     *
+     * @return the names of the classes known by
+     * <tt>ActiveSpeakerDetectorImpl</tt> to implement actual algorithms for the
+     * detection/identification of the active/dominant speaker in a multipoint
+     * conference
+     */
+    private static String[] getImplClassNames()
+    {
+        /*
+         * The user is allowed to specify the class name of the (default)
+         * algorithm implementation through the ConfigurationService.
+         */
+        ConfigurationService cfg = LibJitsi.getConfigurationService();
+        String implClassName = null;
+
+        if (cfg != null)
+        {
+            implClassName = cfg.getString(IMPL_CLASS_NAME_PNAME);
+            if ((implClassName != null) && (implClassName.length() == 0))
+                implClassName = null;
+        }
+
+        /*
+         * Should the user's choice with respect to the algorithm implementation
+         * fail, ActiveSpeakerDetectorImpl falls back to well-known algorithm
+         * implementations.   
+         */
+        String[] implClassNames;
+
+        if (implClassName == null)
+        {
+            implClassNames = IMPL_CLASS_NAMES;
+        }
+        else
+        {
+            List<String> implClassNameList
+                = new ArrayList<String>(1 + IMPL_CLASS_NAMES.length);
+
+            implClassNameList.add(normalizeClassName(implClassName));
+            for (String anImplClassName : IMPL_CLASS_NAMES)
+            {
+                anImplClassName = normalizeClassName(anImplClassName);
+                if (!implClassNameList.contains(anImplClassName))
+                    implClassNameList.add(anImplClassName);
+            }
+
+            implClassNames
+                = implClassNameList.toArray(
+                        new String[implClassNameList.size()]);
+        }
+        return implClassNames;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void levelChanged(long ssrc, int level)
+    {
+        impl.levelChanged(ssrc, level);
+    }
+
+    /**
+     * Makes sure that a specific class name starts with a package name.
+     *
+     * @param className the class name to prefix with a package name if
+     * necessary
+     * @return a class name with a package name and a simple name
+     */
+    private static String normalizeClassName(String className)
+    {
+        if (className.startsWith("."))
+        {
+            Package pkg = ActiveSpeakerDetectorImpl.class.getPackage();
+
+            if (pkg != null)
+                className = pkg.getName() + className;
+        }
+        return className;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void removeActiveSpeakerChangedListener(
+            ActiveSpeakerChangedListener listener)
+    {
+        impl.removeActiveSpeakerChangedListener(listener);
+    }
+}
diff --git a/src/org/jitsi/impl/neomedia/BasicActiveSpeakerDetector.java b/src/org/jitsi/impl/neomedia/BasicActiveSpeakerDetector.java
new file mode 100644
index 00000000..23ee2590
--- /dev/null
+++ b/src/org/jitsi/impl/neomedia/BasicActiveSpeakerDetector.java
@@ -0,0 +1,260 @@
+/*
+ * Jitsi, the OpenSource Java VoIP and Instant Messaging client.
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jitsi.impl.neomedia;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+import org.jitsi.service.neomedia.*;
+
+/**
+ * {@inheritDoc}
+ * <p>
+ * For each stream, it keeps the last {@link History#SIZE} levels and uses them
+ * to compute a score.
+ * </p>
+ * <p>
+ * At all times one stream (indicated in {@link #active}) is considered active,
+ * while the rest are considered 'competing'.
+ * </p>
+ * <p>
+ * When a new audio level is received for a stream, its score is recomputed, and
+ * the scores of all streams are examined in order to determine if one of the
+ * competing streams should replace the then active stream.
+ * </p>
+ * <p>
+ * In order to be eligible to replace the active stream, a competing stream has
+ * to:
+ * 1. Have a score at least {@link #ACTIVE_COEF} times as much as the score of
+ * the currently active stream.
+ * 2. Have score at least {@link #MIN_NEW_ACTIVE_SCORE}.
+ * 3. Have had its audio level updated in the last
+ * {@link #MAX_NEW_ACTIVE_SILENT_INTERVAL} milliseconds.
+ * 4. Have updated its audio level at least {@link #MIN_NEW_ACTIVE_SIZE} times.
+ * </p>
+ * <p>
+ * In order to actually replace the active, a competing stream has to have the
+ * highest score amongst all eligible streams.
+ * </p>
+ * <p>
+ * These rules and the constant values were chosen based on a few not very
+ * thorough tests in a conference. Some justification for the rules:
+ * 1. Helps to avoid often changing the active when there are two streams with
+ * similar levels.
+ * 2. This is to prevent switching the active stream away during times of
+ * &quot;silence&quot;. Without this threshold we observed the following:
+ * someone's microphone generates noise with levels above the levels of the
+ * active speaker. When the active speaker pauses speaking, the one with the
+ * higher noise becomes active.
+ * 3. This is for the case when someone quits the conference shouting ;)
+ * 4. This is because of the way we compute scores. Just-added streams might
+ * have an uncharacteristically high score.
+ * </p>
+ *
+ * @author Boris Grozev
+ */
+public class BasicActiveSpeakerDetector
+    extends AbstractActiveSpeakerDetector
+{
+    //TODO clean histories for very old streams (dont keep hitting MAX_NEW_ACTIVE_SILENT_INTERVAL)
+
+    private static final double ACTIVE_COEF = 1.15;
+
+    private static final int MAX_NEW_ACTIVE_SILENT_INTERVAL = 1000; //ms
+
+    private static final double MIN_NEW_ACTIVE_SCORE = 120.;
+
+    private static final int MIN_NEW_ACTIVE_SIZE = 20;
+
+    private History active;
+
+    private final Object activeSyncRoot = new Object();
+
+    private final Map<Long, History> histories = new HashMap<Long, History>();
+
+    private final ReadWriteLock historiesLock = new ReentrantReadWriteLock();
+
+    private History getHistory(long ssrc)
+    {
+        History history = null;
+
+        Lock readLock = historiesLock.readLock();
+
+        readLock.lock();
+        try
+        {
+            history = histories.get(ssrc);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+
+        if (history == null)
+        {
+            history = new History(ssrc);
+            Lock writeLock = historiesLock.writeLock();
+
+            writeLock.lock();
+            try
+            {
+                histories.put(ssrc, history);
+            }
+            finally
+            {
+                writeLock.unlock();
+            }
+        }
+
+        return history;
+    }
+
+    @Override
+    public void levelChanged(long ssrc, int level)
+    {
+        History history = getHistory(ssrc);
+
+        history.update(level);
+
+        updateActive();
+    }
+
+    private History setInitialActive(ArrayList<History> histories)
+    {
+        History bestHistory = null;
+        Double bestScore = 0.;
+
+        for (History h : histories)
+        {
+            if (h.score >= bestScore)
+            {
+                bestHistory = h;
+                bestScore = h.score;
+            }
+        }
+
+        synchronized (activeSyncRoot)
+        {
+            active = bestHistory;
+        }
+
+        return bestHistory;
+    }
+
+    private void updateActive()
+    {
+        History active;
+        synchronized (activeSyncRoot)
+        {
+            active = this.active;
+        }
+
+        ArrayList<History> histories;
+        Lock readLock = historiesLock.readLock();
+
+        readLock.lock();
+        try
+        {
+            histories = new ArrayList<History>(this.histories.values());
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+
+        if (histories.isEmpty())
+            return;
+
+        if (active == null)
+        {
+            active = setInitialActive(histories);
+            if (active != null)
+                fireActiveSpeakerChanged(active.ssrc);
+            return;
+        }
+
+        History newActive = active;
+        for (History history : histories)
+        {
+            if (history.lastUpdate != -1
+                    && history.lastUpdate + MAX_NEW_ACTIVE_SILENT_INTERVAL
+                        < System.currentTimeMillis()) //rule 4 in class javadoc
+            {
+                history.reset();
+            }
+
+            if (history.score > active.score * ACTIVE_COEF //rule 1
+                  && history.score > newActive.score //highest score among eligible
+                  && history.size >= MIN_NEW_ACTIVE_SIZE //rule 3
+                  && history.score >= MIN_NEW_ACTIVE_SCORE) //rule 2
+                newActive = history;
+        }
+
+        if (newActive != active)
+        {
+            synchronized (activeSyncRoot)
+            {
+                this.active = newActive;
+            }
+            fireActiveSpeakerChanged(newActive.ssrc);
+        }
+    }
+
+    private static class History
+    {
+        private static final int C_OLDER = 1;
+        private static final int C_RECENT = 2;
+        private static final int SIZE = 25 + 100;
+
+        private int head = 0;
+        private int[] history = new int[SIZE];
+        private long lastUpdate = -1;
+        private double score = 0.;
+        private int size = 0;
+        private long ssrc = -1;
+
+        private History(long ssrc)
+        {
+            this.ssrc = ssrc;
+        }
+
+        private synchronized void reset()
+        {
+            lastUpdate = -1;
+            size = head = 0;
+            score = 0.;
+            Arrays.fill(history, 0);
+        }
+
+        private synchronized void update(int level)
+        {
+            //TODO compute score efficiently
+
+            history[head] = level;
+
+            head = (head+1)%SIZE;
+            size = Math.min(size+1, SIZE);
+
+            int sum = 0;
+            for (int i=0; i<100; i++)
+                sum += history[(head+i)%SIZE];
+
+            int sum2 = 0;
+            for (int i=0; i<25; i++)
+                sum2 += history[(SIZE+head-1-i)%SIZE];
+
+            score = C_OLDER*((double)sum)/100 + C_RECENT*((double)sum2)/25;
+            lastUpdate = System.currentTimeMillis();
+//            if(score>110)
+//            {
+//                System.err.println(
+//                        getClass().getName() + " " + ssrc + " update(" + level
+//                            + ") score=" + score);
+//            }
+        }
+    }
+}
diff --git a/src/org/jitsi/impl/neomedia/transform/csrc/CsrcAudioLevelDispatcher.java b/src/org/jitsi/impl/neomedia/transform/csrc/CsrcAudioLevelDispatcher.java
new file mode 100644
index 00000000..eb0db1c9
--- /dev/null
+++ b/src/org/jitsi/impl/neomedia/transform/csrc/CsrcAudioLevelDispatcher.java
@@ -0,0 +1,183 @@
+package org.jitsi.impl.neomedia.transform.csrc;
+
+import java.util.concurrent.*;
+
+import org.jitsi.impl.neomedia.*;
+
+/**
+ * A simple thread that waits for new levels to be reported from incoming
+ * RTP packets and then delivers them to the <tt>AudioMediaStream</tt>
+ * associated with this engine. The reason we need to do this in a separate
+ * thread is, of course, the time sensitive nature of incoming RTP packets.
+ *
+ * @author Emil Ivov
+ * @author Lyubomir Marinov
+ */
+public class CsrcAudioLevelDispatcher
+    implements Runnable
+{
+    /**
+     * The pool of <tt>Thread</tt>s which run
+     * <tt>CsrcAudioLevelDispatcher</tt>s.
+     */
+    private static final ExecutorService threadPool
+        = Executors.newCachedThreadPool(
+                new ThreadFactory()
+                {
+                    /**
+                     * The default <tt>ThreadFactory</tt> implementation which
+                     * is augmented by this instance to create daemon
+                     * <tt>Thread</tt>s.
+                     */
+                    private final ThreadFactory defaultThreadFactory
+                        = Executors.defaultThreadFactory();
+
+                    @Override
+                    public Thread newThread(Runnable r)
+                    {
+                        Thread t = defaultThreadFactory.newThread(r);
+
+                        if (t != null)
+                        {
+                            t.setDaemon(true);
+
+                            /*
+                             * Additionally, make it known through the name of
+                             * the Thread that it is associated with the
+                             * CsrcAudioLevelDispatcher class for
+                             * debugging/informational purposes.
+                             */
+                            String name = t.getName();
+
+                            if (name == null)
+                                name = "";
+                            t.setName("CsrcAudioLevelDispatcher-" + name);
+                        }
+                        return t;
+                    }
+                });
+
+    /** The levels that we last received from the reverseTransform thread*/
+    private long[] lastReportedLevels = null;
+
+    /**
+     * The <tt>AudioMediaStreamImpl</tt> which listens to this event dispatcher.
+     * If <tt>null</tt>, this event dispatcher is stopped. If non-<tt>null</tt>,
+     * this event dispatcher is started.
+     */
+    private AudioMediaStreamImpl mediaStream;
+
+    /**
+     * The indicator which determines whether this event dispatcher has been
+     * scheduled for execution by {@link #threadPool}.
+     */
+    private boolean scheduled = false;
+
+    /**
+     * Initializes a new <tt>CsrcAudioLevelDispatcher</tt> to dispatch events
+     * to a specific <tt>AudioMediaStreamImpl</tt>.
+     *
+     * @param mediaStream the <tt>AudioMediaStreamImpl</tt> to which the new
+     * instance is to dispatch events
+     */
+    public CsrcAudioLevelDispatcher(AudioMediaStreamImpl mediaStream)
+    {
+        setMediaStream(mediaStream);
+    }
+
+    /**
+     * A level matrix that we should deliver to our media stream and
+     * its listeners in a separate thread.
+     *
+     * @param levels the levels that we'd like to queue for processing.
+     */
+    public void addLevels(long[] levels)
+    {
+        synchronized(this)
+        {
+            this.lastReportedLevels = levels;
+
+            if ((mediaStream != null) && !scheduled)
+            {
+                threadPool.execute(this);
+                scheduled = true;
+            }
+
+            notifyAll();
+        }
+    }
+
+    /**
+     * Waits for new levels to be reported via the <tt>addLevels()</tt> method
+     * and then delivers them to the <tt>AudioMediaStream</tt> that we are
+     * associated with.
+     */
+    @Override
+    public void run()
+    {
+        try
+        {
+            do
+            {
+                AudioMediaStreamImpl mediaStream;
+                long[] levels;
+
+                synchronized(this)
+                {
+                    // If the mediaStream is null, this instance is to stop.
+                    mediaStream = this.mediaStream;
+                    if (mediaStream == null)
+                    {
+                        scheduled = false;
+                        break;
+                    }
+
+                    if(lastReportedLevels == null)
+                    {
+                        try { wait(); } catch (InterruptedException ie) {}
+                        continue;
+                    }
+                    else
+                    {
+                        levels = lastReportedLevels;
+                        lastReportedLevels = null;
+                    }
+                }
+
+                if(levels != null)
+                    mediaStream.audioLevelsReceived(levels);
+            }
+            while (true);
+        }
+        finally
+        {
+            synchronized (this)
+            {
+                scheduled = false;
+            }
+        }
+    }
+
+    /**
+     * Causes our run method to exit so that this thread would stop
+     * handling levels.
+     */
+    public void setMediaStream(AudioMediaStreamImpl mediaStream)
+    {
+        synchronized(this)
+        {
+            if (this.mediaStream != mediaStream)
+            {
+                this.mediaStream = mediaStream;
+
+                /*
+                 * If the mediaStream changes, it is unlikely that the
+                 * lastReportedLevels are associated with it.
+                 */
+                lastReportedLevels = null;
+
+                notifyAll();
+            }
+        }
+    }
+}
diff --git a/src/org/jitsi/impl/neomedia/transform/csrc/CsrcTransformEngine.java b/src/org/jitsi/impl/neomedia/transform/csrc/CsrcTransformEngine.java
index ab4f558b..dc069201 100644
--- a/src/org/jitsi/impl/neomedia/transform/csrc/CsrcTransformEngine.java
+++ b/src/org/jitsi/impl/neomedia/transform/csrc/CsrcTransformEngine.java
@@ -38,7 +38,7 @@ public class CsrcTransformEngine
     /**
      * The dispatcher that is delivering audio levels to the media steam.
      */
-    private CsrcAudioLevelDispatcher csrcAudioLevelDispatcher = null;
+    private final CsrcAudioLevelDispatcher csrcAudioLevelDispatcher;
 
     /**
      * The buffer that we use to encode the csrc audio level extensions.
@@ -93,6 +93,18 @@ public CsrcTransformEngine(MediaStreamImpl mediaStream)
                 }
             }
         }
+
+        // Audio levels are received in RTP audio streams only.
+        if (this.mediaStream instanceof AudioMediaStreamImpl)
+        {
+            csrcAudioLevelDispatcher
+                = new CsrcAudioLevelDispatcher(
+                        (AudioMediaStreamImpl) this.mediaStream);
+        }
+        else
+        {
+            csrcAudioLevelDispatcher = null;
+        }
     }
 
     /**
@@ -102,7 +114,7 @@ public CsrcTransformEngine(MediaStreamImpl mediaStream)
     public void close()
     {
         if (csrcAudioLevelDispatcher != null)
-            csrcAudioLevelDispatcher.stop();
+            csrcAudioLevelDispatcher.setMediaStream(null);
     }
 
     /**
@@ -209,20 +221,14 @@ public PacketTransformer getRTPTransformer()
     public RawPacket reverseTransform(RawPacket pkt)
     {
         if ((csrcAudioLevelExtID > 0)
-                && csrcAudioLevelDirection.allowsReceiving())
+                && csrcAudioLevelDirection.allowsReceiving()
+                && (csrcAudioLevelDispatcher != null))
         {
             //extract the audio levels and send them to the dispatcher.
             long[] levels = pkt.extractCsrcAudioLevels(csrcAudioLevelExtID);
 
             if (levels != null)
-            {
-                if (csrcAudioLevelDispatcher == null)
-                {
-                    csrcAudioLevelDispatcher = new CsrcAudioLevelDispatcher();
-                    new Thread(csrcAudioLevelDispatcher).start();
-                }
                 csrcAudioLevelDispatcher.addLevels(levels);
-            }
         }
 
         return pkt;
@@ -285,88 +291,4 @@ public synchronized RawPacket transform(RawPacket pkt)
 
         return pkt;
     }
-
-    /**
-     * A simple thread that waits for new levels to be reported from incoming
-     * RTP packets and then delivers them to the <tt>AudioMediaStream</tt>
-     * associated with this engine. The reason we need to do this in a separate
-     * thread is, of course, the time sensitive nature of incoming RTP packets.
-     */
-    private class CsrcAudioLevelDispatcher
-        implements Runnable
-    {
-        /** Indicates whether this thread is supposed to be running */
-        private boolean isRunning = false;
-
-        /** The levels that we last received from the reverseTransform thread*/
-        private long[] lastReportedLevels = null;
-
-        /**
-         * A level matrix that we should deliver to our media stream and
-         * its listeners in a separate thread.
-         *
-         * @param levels the levels that we'd like to queue for processing.
-         */
-        public void addLevels(long[] levels)
-        {
-            synchronized(this)
-            {
-                this.lastReportedLevels = levels;
-                notifyAll();
-            }
-        }
-
-        /**
-         * Waits for new levels to be reported via the <tt>addLevels()</tt>
-         * method and then delivers them to the <tt>AudioMediaStream</tt> that
-         * we are associated with.
-         */
-        public void run()
-        {
-            isRunning = true;
-
-            // Audio levels are received in RTP audio streams only.
-            if(!(mediaStream instanceof AudioMediaStreamImpl))
-                return;
-
-            AudioMediaStreamImpl audioStream
-                = (AudioMediaStreamImpl) mediaStream;
-
-            while(isRunning)
-            {
-                long[] audioLevels;
-
-                synchronized(this)
-                {
-                    if(lastReportedLevels == null)
-                    {
-                        try { wait(); } catch (InterruptedException ie) {}
-                        continue;
-                    }
-                    else
-                    {
-                        audioLevels = lastReportedLevels;
-                        lastReportedLevels = null;
-                    }
-                }
-
-                if(audioLevels != null)
-                    audioStream.audioLevelsReceived(audioLevels);
-            }
-        }
-
-        /**
-         * Causes our run method to exit so that this thread would stop
-         * handling levels.
-         */
-        public void stop()
-        {
-            synchronized(this)
-            {
-                this.lastReportedLevels = null;
-                isRunning = false;
-                notifyAll();
-            }
-        }
-    }
 }
diff --git a/src/org/jitsi/impl/neomedia/transform/csrc/SsrcTransformEngine.java b/src/org/jitsi/impl/neomedia/transform/csrc/SsrcTransformEngine.java
index 8a8d455c..d940ea44 100644
--- a/src/org/jitsi/impl/neomedia/transform/csrc/SsrcTransformEngine.java
+++ b/src/org/jitsi/impl/neomedia/transform/csrc/SsrcTransformEngine.java
@@ -68,6 +68,11 @@ public class SsrcTransformEngine
      */
     private static boolean readConfigurationServicePropertiesOnce = true;
 
+    /**
+     * The dispatcher that is delivering audio levels to the media steam.
+     */
+    private final CsrcAudioLevelDispatcher csrcAudioLevelDispatcher;
+
     /**
      * The number of consecutive RTP packets indicated as generated from a muted
      * audio source and dropped in {@link #reverseTransform(RawPacket)}.
@@ -120,6 +125,18 @@ public SsrcTransformEngine(MediaStreamImpl mediaStream)
         }
 
         readConfigurationServicePropertiesOnce();
+
+        // Audio levels are received in RTP audio streams only.
+        if (mediaStream instanceof AudioMediaStreamImpl)
+        {
+            csrcAudioLevelDispatcher
+                = new CsrcAudioLevelDispatcher(
+                        (AudioMediaStreamImpl) mediaStream);
+        }
+        else
+        {
+            csrcAudioLevelDispatcher = null;
+        }
     }
 
     /**
@@ -128,6 +145,8 @@ public SsrcTransformEngine(MediaStreamImpl mediaStream)
      */
     public void close()
     {
+        if (csrcAudioLevelDispatcher != null)
+            csrcAudioLevelDispatcher.setMediaStream(null);
     }
 
     /**
@@ -215,6 +234,19 @@ public RawPacket reverseTransform(RawPacket pkt)
                     pkt.setFlags(Buffer.FLAG_SILENCE | pkt.getFlags());
                 }
             }
+
+            /*
+             * Notify the AudioMediaStream associated with this instance about
+             * the received audio level.
+             */
+            if (!dropPkt && (csrcAudioLevelDispatcher != null))
+            {
+                long[] levels = new long[2];
+
+                levels[0] = 0xFFFFFFFFL & pkt.getSSRC();
+                levels[1] = 127 - level;
+                csrcAudioLevelDispatcher.addLevels(levels);
+            }
         }
         if (dropPkt)
         {
diff --git a/src/org/jitsi/service/neomedia/AbstractActiveSpeakerDetector.java b/src/org/jitsi/service/neomedia/AbstractActiveSpeakerDetector.java
new file mode 100644
index 00000000..393463c2
--- /dev/null
+++ b/src/org/jitsi/service/neomedia/AbstractActiveSpeakerDetector.java
@@ -0,0 +1,112 @@
+/*
+ * Jitsi, the OpenSource Java VoIP and Instant Messaging client.
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jitsi.service.neomedia;
+
+import java.util.*;
+
+import org.jitsi.service.neomedia.event.*;
+
+/**
+ * Provides a base {@link ActiveSpeakerDetector} which aids the implementations
+ * of actual algorithms for the detection/identification of the active/dominant
+ * speaker in a multipoint conference.
+ *
+ * @author Boris Grozev
+ * @author Lyubomir Marinov
+ */
+public abstract class AbstractActiveSpeakerDetector
+    implements ActiveSpeakerDetector
+{
+    /**
+     * An empty array with element type <tt>ActiveSpeakerChangedListener</tt>.
+     * Explicitly defined for the purposes of reducing the total number of
+     * unnecessary allocations and the undesired effects of the garbage
+     * collector.
+     */
+    private static final ActiveSpeakerChangedListener[] NO_LISTENERS
+        = new ActiveSpeakerChangedListener[0];
+
+    /**
+     * The list of listeners to be notified by this detector when the active
+     * speaker changes.
+     */
+    private final List<ActiveSpeakerChangedListener> listeners
+        = new LinkedList<ActiveSpeakerChangedListener>();
+
+    /**
+     * {@inheritDoc}
+     *
+     * @throws NullPointerException if the specified <tt>listener</tt> is
+     * <tt>null</tt>
+     */
+    @Override
+    public void addActiveSpeakerChangedListener(
+            ActiveSpeakerChangedListener listener)
+    {
+        if (listener == null)
+            throw new NullPointerException("listener");
+
+        synchronized (listeners)
+        {
+            if (!listeners.contains(listener))
+                listeners.add(listener);
+        }
+    }
+
+    /**
+     * Notifies the <tt>ActiveSpeakerChangedListener</tt>s registered with this
+     * instance that the active speaker in multipoint conference associated with
+     * this instance has changed and is identified by a specific synchronization
+     * source identifier/SSRC.
+     *
+     * @param ssrc the synchronization source identifier/SSRC of the active
+     * speaker in the multipoint conference
+     */
+    protected void fireActiveSpeakerChanged(long ssrc)
+    {
+        ActiveSpeakerChangedListener[] listeners
+            = getActiveSpeakerChangedListeners();
+
+        for (ActiveSpeakerChangedListener listener : listeners)
+            listener.activeSpeakerChanged(ssrc);
+    }
+
+    /**
+     * Gets the list of listeners to be notified by this detector when the
+     * active speaker changes.
+     *
+     * @return an array of the listeners to be notified by this detector when
+     * the active speaker changes. If no such listeners are registered with this
+     * instance, an empty array is returned. 
+     */
+    protected ActiveSpeakerChangedListener[] getActiveSpeakerChangedListeners()
+    {
+        synchronized (listeners)
+        {
+            return
+                (listeners.size() == 0)
+                    ? NO_LISTENERS
+                    : listeners.toArray(NO_LISTENERS);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void removeActiveSpeakerChangedListener(
+            ActiveSpeakerChangedListener listener)
+    {
+        if (listener != null)
+        {
+            synchronized (listeners)
+            {
+                listeners.remove(listener);
+            }
+        }
+    }
+}
diff --git a/src/org/jitsi/service/neomedia/ActiveSpeakerDetector.java b/src/org/jitsi/service/neomedia/ActiveSpeakerDetector.java
new file mode 100644
index 00000000..01ed34b6
--- /dev/null
+++ b/src/org/jitsi/service/neomedia/ActiveSpeakerDetector.java
@@ -0,0 +1,61 @@
+/*
+ * Jitsi, the OpenSource Java VoIP and Instant Messaging client.
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jitsi.service.neomedia;
+
+import org.jitsi.service.neomedia.event.*;
+
+/**
+ * Represents an algorithm for the detection/identification of the
+ * active/dominant speaker/participant/endpoint/stream in a multipoint
+ * conference.
+ * <p>
+ * Implementations of <tt>ActiveSpeakerDetector</tt> get notified about the
+ * (current) audio levels of multiple audio streams (identified by their
+ * synchronization source identifiers/SSRCs) via calls to
+ * {@link #levelChanged(long, int)} and determine/identify which stream is
+ * dominant/active (in terms of speech). When the active stream changes,
+ * listeners registered via
+ * {@link #addActiveSpeakerChangedListener(ActiveSpeakerChangedListener)} are
+ * notified.
+ * </p>
+ *
+ * @author Boris Grozev
+ * @author Lyubomir Marinov
+ */
+public interface ActiveSpeakerDetector
+{
+    /**
+     * Adds a listener to be notified by this active speaker detector when the
+     * active stream changes.
+     *
+     * @param listener the listener to register with this instance for
+     * notificatons about changes of the active speaker
+     */
+    public void addActiveSpeakerChangedListener(
+            ActiveSpeakerChangedListener listener);
+
+    /**
+     * Notifies this <tt>ActiveSpeakerDetector</tt> about the latest/current
+     * audio level of a stream/speaker identified by a specific synchronization
+     * source identifier/SSRC.
+     *
+     * @param ssrc the SSRC of the stream/speaker
+     * @param level the latest/current audio level of the stream/speaker with
+     * the specified <tt>ssrc</tt>
+     */
+    public void levelChanged(long ssrc, int level);
+
+    /**
+     * Removes a listener to no longer be notified by this active speaker
+     * detector when the active stream changes.
+     *
+     * @param listener the listener to unregister with this instance for
+     * notificatons about changes of the active speaker
+     */
+    public void removeActiveSpeakerChangedListener(
+            ActiveSpeakerChangedListener listener);
+}
diff --git a/src/org/jitsi/service/neomedia/event/ActiveSpeakerChangedListener.java b/src/org/jitsi/service/neomedia/event/ActiveSpeakerChangedListener.java
new file mode 100644
index 00000000..ec3bd432
--- /dev/null
+++ b/src/org/jitsi/service/neomedia/event/ActiveSpeakerChangedListener.java
@@ -0,0 +1,26 @@
+/*
+ * Jitsi, the OpenSource Java VoIP and Instant Messaging client.
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jitsi.service.neomedia.event;
+
+/**
+ * Implementing classes can be notified about changes to the 'active' stream
+ * (identified by its SSRC) using {@link #activeSpeakerChanged(long)}.
+ *
+ * @author Boris Grozev
+ */
+public interface ActiveSpeakerChangedListener
+{
+    /**
+     * Notifies this listener that the active/dominant stream/speaker has been
+     * changed to one identified by a specific synchronization source
+     * identifier/SSRC.
+     *
+     * @param ssrc the SSRC of the latest/current active/dominant
+     * stream/speaker
+     */
+    public void activeSpeakerChanged(long ssrc);
+}
-- 
GitLab