From d2f169a1ca8e9765d0dc51ae3d6cab8bc4634b3d Mon Sep 17 00:00:00 2001 From: Lyubomir Marinov <lyubomir.marinov@jitsi.org> Date: Mon, 25 Nov 2013 15:56:10 +0200 Subject: [PATCH] When relaying RTP, routes RTCP Feedback Messages to their indicated recepient only. --- .../neomedia/RTCPConnectorInputStream.java | 4 +- .../impl/neomedia/RTCPFeedbackPacket.java | 4 +- .../impl/neomedia/RTPTranslatorImpl.java | 402 ++++++++++++++---- .../device/VideoMediaDeviceSession.java | 14 +- .../ControlTransformInputStream.java | 15 +- .../neomedia/event/RTCPFeedbackListener.java | 1 - 6 files changed, 336 insertions(+), 104 deletions(-) diff --git a/src/org/jitsi/impl/neomedia/RTCPConnectorInputStream.java b/src/org/jitsi/impl/neomedia/RTCPConnectorInputStream.java index 02983a29..a8069aa2 100644 --- a/src/org/jitsi/impl/neomedia/RTCPConnectorInputStream.java +++ b/src/org/jitsi/impl/neomedia/RTCPConnectorInputStream.java @@ -90,10 +90,10 @@ public static void fireRTCPFeedbackReceived( || (pt == RTCPFeedbackEvent.PT_TL)) { int fmt = buffer[offset] & 0x1F; - RTCPFeedbackEvent evt = new RTCPFeedbackEvent(source, fmt, pt); + RTCPFeedbackEvent ev = new RTCPFeedbackEvent(source, fmt, pt); for (RTCPFeedbackListener l : listeners) - l.rtcpFeedbackReceived(evt); + l.rtcpFeedbackReceived(ev); } } } diff --git a/src/org/jitsi/impl/neomedia/RTCPFeedbackPacket.java b/src/org/jitsi/impl/neomedia/RTCPFeedbackPacket.java index ac30fbf4..c6b1b677 100644 --- a/src/org/jitsi/impl/neomedia/RTCPFeedbackPacket.java +++ b/src/org/jitsi/impl/neomedia/RTCPFeedbackPacket.java @@ -28,12 +28,12 @@ public class RTCPFeedbackPacket /** * SSRC of packet sender. */ - private long senderSSRC = 0; + private final long senderSSRC; /** * SSRC of media source. */ - private long sourceSSRC = 0; + private final long sourceSSRC; /** * Constructor. diff --git a/src/org/jitsi/impl/neomedia/RTPTranslatorImpl.java b/src/org/jitsi/impl/neomedia/RTPTranslatorImpl.java index 385c1a23..608e8ea3 100644 --- a/src/org/jitsi/impl/neomedia/RTPTranslatorImpl.java +++ b/src/org/jitsi/impl/neomedia/RTPTranslatorImpl.java @@ -49,10 +49,10 @@ public class RTPTranslatorImpl private static final boolean CREATE_FAKE_SEND_STREAM_IF_NECESSARY = false; /** - * An array with <tt>long</tt> element type and no elements explicitly - * defined to reduce unnecessary allocations. + * An array with <tt>int</tt> element type and no elements explicitly + * defined to reduce unnecessary allocations. */ - private static final long[] EMPTY_LONG_ARRAY = new long[0]; + private static final int[] EMPTY_INT_ARRAY = new int[0]; /** * The <tt>RTPConnector</tt> which is used by {@link #manager} and which @@ -479,7 +479,7 @@ public synchronized void dispose(StreamRTPManager streamRTPManager) */ private synchronized StreamRTPManagerDesc findStreamRTPManagerDescByReceiveSSRC( - long receiveSSRC, + int receiveSSRC, StreamRTPManagerDesc exclusion) { for (int i = 0, count = streamRTPManagers.size(); i < count; i++) @@ -582,9 +582,13 @@ public synchronized Vector<ReceiveStream> getReceiveStreams( for (Object s : managerReceiveStreams) { ReceiveStream receiveStream = (ReceiveStream) s; + /* + * FMJ stores the synchronization source (SSRC) identifiers + * as 32-bit signed values. + */ + int receiveSSRC = (int) receiveStream.getSSRC(); - if (streamRTPManagerDesc.containsReceiveSSRC( - receiveStream.getSSRC())) + if (streamRTPManagerDesc.containsReceiveSSRC(receiveSSRC)) receiveStreams.add(receiveStream); } } @@ -695,25 +699,39 @@ private static void logRTCP( * Do the bytes in the specified buffer resemble (a header of) an RTCP * packet? */ - if (length > 8) + if (length >= 8 /* BYE */) { byte b0 = buffer[offset]; int v = (b0 & 0xc0) >>> 6; - if (v == 2) + if (v == RTCPHeader.VERSION) { byte b1 = buffer[offset + 1]; int pt = b1 & 0xff; if (pt == 203 /* BYE */) { - int sc = b0 & 0x1f; - long ssrc = (sc > 0) ? readInt(buffer, offset + 4) : -1; + // Verify the length field. + int rtcpLength + = (readUnsignedShort(buffer, offset + 2) + 1) * 4; + + if (rtcpLength <= length) + { + int sc = b0 & 0x1f; + int off = offset + 4; - logger.trace( - obj.getClass().getName() + '.' + methodName - + ": RTCP BYE v=" + v + "; pt=" + pt + "; ssrc=" - + ssrc + ';'); + for (int i = 0, end = offset + length; + (i < sc) && (off + 4 <= end); + i++, off += 4) + { + int ssrc = readInt(buffer, off); + + logger.trace( + obj.getClass().getName() + '.' + methodName + + ": RTCP BYE SSRC/CSRC " + + Long.toString(ssrc & 0xffffffffl)); + } + } } } } @@ -751,13 +769,18 @@ private synchronized int read( if (data) { - /** - * Ignore RTP packets coming from peers whose MediaStream's direction - * does not allow receiving. + /* + * Ignore RTP packets coming from peers whose MediaStream's + * direction does not allow receiving. */ - if (!streamRTPManagerDesc.streamRTPManager.getMediaStream() - .getDirection().allowsReceiving()) + if (!streamRTPManagerDesc + .streamRTPManager + .getMediaStream() + .getDirection() + .allowsReceiving()) + { return read; + } /* * Do the bytes in the specified buffer resemble (a header of) an @@ -767,7 +790,7 @@ private synchronized int read( && (/* v */ ((buffer[offset] & 0xc0) >>> 6) == RTPHeader.VERSION)) { - long ssrc = readInt(buffer, offset + 8); + int ssrc = readInt(buffer, offset + 8); if (!streamRTPManagerDesc.containsReceiveSSRC(ssrc)) { @@ -775,9 +798,13 @@ private synchronized int read( ssrc, streamRTPManagerDesc) == null) + { streamRTPManagerDesc.addReceiveSSRC(ssrc); + } else + { return 0; + } } int pt = buffer[offset + 1] & 0x7f; @@ -815,22 +842,37 @@ else if (logger.isTraceEnabled()) /** * Reads an <tt>int</tt> from a specific <tt>byte</tt> buffer starting at a - * specific <tt>offset</tt>. The implementation is the same as + * specific offset. The implementation is the same as * {@link DataInputStream#readInt()}. * - * @param buffer the <tt>byte</tt> buffer to read an <tt>int</tt> from - * @param offset the zero-based offset in <tt>buffer</tt> to start reading - * an <tt>int</tt> from - * @return an <tt>int</tt> read from the specified <tt>buffer</tt> starting - * at the specified <tt>offset</tt> + * @param buf the <tt>byte</tt> buffer to read an <tt>int</tt> from + * @param off the zero-based offset in <tt>buf</tt> to start reading an + * <tt>int</tt> from + * @return an <tt>int</tt> read from the specified <tt>buf</tt> starting at + * the specified <tt>off</tt> */ - public static int readInt(byte[] buffer, int offset) + public static int readInt(byte[] buf, int off) { return - ((buffer[offset++] & 0xff) << 24) - | ((buffer[offset++] & 0xff) << 16) - | ((buffer[offset++] & 0xff) << 8) - | (buffer[offset] & 0xff); + ((buf[off++] & 0xff) << 24) + | ((buf[off++] & 0xff) << 16) + | ((buf[off++] & 0xff) << 8) + | (buf[off] & 0xff); + } + + /** + * Reads a 16-bit unsigned value from a specific <tt>byte</tt> buffer + * starting at a specific offset and returns it as an <tt>int</tt>. + * + * @param buf the <tt>byte</tt> buffer to read a 16-bit unsigned value from + * @param off the zero-based offset in <tt>buf</tt> to start reading a + * 16-bit unsigned value from + * @return an <tt>int</tt> read from the specified <tt>buf</tt> as a 16-bit + * unsigned value starting at the specified <tt>off</tt> + */ + public static int readUnsignedShort(byte[] buf, int off) + { + return ((buf[off++] & 0xff) << 8) | (buf[off] & 0xff); } /** @@ -928,15 +970,22 @@ public void update(ReceiveStreamEvent event) if (receiveStream != null) { + /* + * FMJ stores the synchronization source (SSRC) identifiers as + * 32-bit signed values. + */ + int receiveSSRC = (int) receiveStream.getSSRC(); StreamRTPManagerDesc streamRTPManagerDesc - = findStreamRTPManagerDescByReceiveSSRC( - receiveStream.getSSRC(), - null); + = findStreamRTPManagerDescByReceiveSSRC(receiveSSRC, null); if (streamRTPManagerDesc != null) + { for (ReceiveStreamListener listener : streamRTPManagerDesc.getReceiveStreamListeners()) + { listener.update(event); + } + } } } } @@ -1015,7 +1064,7 @@ private synchronized int doWrite( Format format, StreamRTPManagerDesc exclusion) { - int write = 0; + int written = 0; for (int streamIndex = 0, streamCount = streams.size(); streamIndex < streamCount; @@ -1025,51 +1074,39 @@ private synchronized int doWrite( StreamRTPManagerDesc streamRTPManagerDesc = streamDesc.connectorDesc.streamRTPManagerDesc; - if (streamRTPManagerDesc != exclusion) - { - if (data) - { - /* - * Only write data packets to OutputDataStream-s for - * which the associated MediaStream allows sending. - */ - if (!streamRTPManagerDesc.streamRTPManager. - getMediaStream().getDirection().allowsSending()) - return write; - - if ((format != null) && (length > 0)) - { - Integer payloadType - = streamRTPManagerDesc.getPayloadType(format); + if (streamRTPManagerDesc == exclusion) + continue; - if ((payloadType == null) && (exclusion != null)) - payloadType = exclusion.getPayloadType(format); - if (payloadType != null) - { - int payloadTypeByteIndex = offset + 1; + boolean write; - buffer[payloadTypeByteIndex] - = (byte) - ((buffer[payloadTypeByteIndex] & 0x80) - | (payloadType & 0x7f)); - } - } - } - else if (logger.isTraceEnabled()) - { - logRTCP( - this, "doWrite", - buffer, offset, length); - } + if (data) + { + write + = willWriteData( + streamRTPManagerDesc, + buffer, offset, length, + format, + exclusion); + } + else + { + write + = willWriteControl( + streamRTPManagerDesc, + buffer, offset, length, + format, + exclusion); + } + if (!write) + continue; - int streamWrite - = streamDesc.stream.write(buffer, offset, length); + int streamWritten + = streamDesc.stream.write(buffer, offset, length); - if (write < streamWrite) - write = streamWrite; - } + if (written < streamWritten) + written = streamWritten; } - return write; + return written; } public synchronized void removeStreams(RTPConnectorDesc connectorDesc) @@ -1174,6 +1211,156 @@ public void run() } } + /** + * Notifies this instance that a specific <tt>byte</tt> buffer will be + * written into the control <tt>OutputDataStream</tt> of a specific + * <tt>StreamRTPManagerDesc</tt>. + * + * @param destination the <tt>StreamRTPManagerDesc</tt> which is the + * destination of the write + * @param buffer the data to be written into <tt>destination</tt> + * @param offset the offset in <tt>buffer</tt> at which the data to be + * written into <tt>destination</tt> starts + * @param length the number of <tt>byte</tt>s in <tt>buffer</tt> + * beginning at <tt>offset</tt> which constitute the data to the written + * into <tt>destination</tt> + * @param format the FMJ <tt>Format</tt> of the data to be written into + * <tt>destination</tt> + * @param exclusion the <tt>StreamRTPManagerDesc</tt> which is exclude + * from the write batch, possibly because it is the cause of the write + * batch in the first place + * @return <tt>true</tt> to write the specified data into the specified + * <tt>destination</tt> or <tt>false</tt> to not write the specified + * data into the specified <tt>destination</tt> + */ + private boolean willWriteControl( + StreamRTPManagerDesc destination, + byte[] buffer, int offset, int length, + Format format, + StreamRTPManagerDesc exclusion) + { + boolean write = true; + + /* + * Do the bytes in the specified buffer resemble (a header of) an + * RTCP packet? + */ + if (length >= 12 /* FB */) + { + byte b0 = buffer[offset]; + int v = (b0 & 0xc0) >>> 6; /* version */ + + if (v == RTCPHeader.VERSION) + { + byte b1 = buffer[offset + 1]; + int pt = b1 & 0xff; /* payload type */ + + if ((pt == 205 /* RTPFB */) || (pt == 206 /* PSFB */)) + { + // Verify the length field. + int rtcpLength + = (readUnsignedShort(buffer, offset + 2) + 1) * 4; + + if (rtcpLength <= length) + { + int ssrcOfMediaSource = readInt(buffer, offset + 8); + + if (destination.containsReceiveSSRC( + ssrcOfMediaSource)) + { + if (logger.isTraceEnabled()) + { + int fmt = b0 & 0x1f; /* feedback message type */ + int ssrcOfPacketSender + = readInt(buffer, offset + 4); + String message + = getClass().getName() + + ".willWriteControl: FMT " + fmt + + ", PT " + pt + + ", SSRC of packet sender " + + Long.toString( + ssrcOfPacketSender + & 0xffffffffl) + + ", SSRC of media source " + + Long.toString( + ssrcOfMediaSource + & 0xffffffffl); + + logger.trace(message); + } + } + else + { + write = false; + } + } + } + } + } + + if (write && logger.isTraceEnabled()) + logRTCP(this, "doWrite", buffer, offset, length); + return write; + } + + /** + * Notifies this instance that a specific <tt>byte</tt> buffer will be + * written into the data <tt>OutputDataStream</tt> of a specific + * <tt>StreamRTPManagerDesc</tt>. + * + * @param destination the <tt>StreamRTPManagerDesc</tt> which is the + * destination of the write + * @param buffer the data to be written into <tt>destination</tt> + * @param offset the offset in <tt>buffer</tt> at which the data to be + * written into <tt>destination</tt> starts + * @param length the number of <tt>byte</tt>s in <tt>buffer</tt> + * beginning at <tt>offset</tt> which constitute the data to the written + * into <tt>destination</tt> + * @param format the FMJ <tt>Format</tt> of the data to be written into + * <tt>destination</tt> + * @param exclusion the <tt>StreamRTPManagerDesc</tt> which is exclude + * from the write batch, possibly because it is the cause of the write + * batch in the first place + * @return <tt>true</tt> to write the specified data into the specified + * <tt>destination</tt> or <tt>false</tt> to not write the specified + * data into the specified <tt>destination</tt> + */ + private boolean willWriteData( + StreamRTPManagerDesc destination, + byte[] buffer, int offset, int length, + Format format, + StreamRTPManagerDesc exclusion) + { + /* + * Only write data packets to OutputDataStreams for which the + * associated MediaStream allows sending. + */ + if (!destination.streamRTPManager.getMediaStream().getDirection() + .allowsSending()) + { + return false; + } + + if ((format != null) && (length > 0)) + { + Integer payloadType = destination.getPayloadType(format); + + if ((payloadType == null) && (exclusion != null)) + payloadType = exclusion.getPayloadType(format); + if (payloadType != null) + { + int payloadTypeByteIndex = offset + 1; + + buffer[payloadTypeByteIndex] + = (byte) + ((buffer[payloadTypeByteIndex] & 0x80) + | (payloadType & 0x7f)); + } + } + + return true; + } + public int write(byte[] buffer, int offset, int length) { return doWrite(buffer, offset, length, null, null); @@ -1994,6 +2181,12 @@ public void stop() } } + /** + * Describes additional information about a <tt>StreamRTPManager</tt> for + * the purposes of <tt>RTPTranslatorImpl</tt>. + * + * @author Lyubomir Marinov + */ private static class StreamRTPManagerDesc { public RTPConnectorDesc connectorDesc; @@ -2001,13 +2194,24 @@ private static class StreamRTPManagerDesc private final Map<Integer, Format> formats = new HashMap<Integer, Format>(); - private long[] receiveSSRCs = EMPTY_LONG_ARRAY; + /** + * The list of synchronization source (SSRC) identifiers received by + * {@link #streamRTPManager} (as <tt>ReceiveStream</tt>s). + */ + private int[] receiveSSRCs = EMPTY_INT_ARRAY; private final List<ReceiveStreamListener> receiveStreamListeners = new LinkedList<ReceiveStreamListener>(); public final StreamRTPManager streamRTPManager; + /** + * Initializes a new <tt>StreamRTPManagerDesc</tt> instance which is to + * describe a specific <tt>StreamRTPManager</tt>. + * + * @param streamRTPManager the <tt>StreamRTPManager</tt> to be described + * by the new instance + */ public StreamRTPManagerDesc(StreamRTPManager streamRTPManager) { this.streamRTPManager = streamRTPManager; @@ -2021,17 +2225,24 @@ public void addFormat(Format format, int payloadType) } } - public synchronized void addReceiveSSRC(long receiveSSRC) + /** + * Adds a new synchronization source (SSRC) identifier to the list of + * SSRC received by the associated <tt>StreamRTPManager</tt>. + * + * @param receiveSSRC the new SSRC to add to the list of SSRC received + * by the associated <tt>StreamRTPManager</tt> + */ + public synchronized void addReceiveSSRC(int receiveSSRC) { if (!containsReceiveSSRC(receiveSSRC)) { int receiveSSRCCount = receiveSSRCs.length; - long[] newReceiveSSRCs = new long[receiveSSRCCount + 1]; + int[] newReceiveSSRCs = new int[receiveSSRCCount + 1]; System.arraycopy( - receiveSSRCs, 0, - newReceiveSSRCs, 0, - receiveSSRCCount); + receiveSSRCs, 0, + newReceiveSSRCs, 0, + receiveSSRCCount); newReceiveSSRCs[receiveSSRCCount] = receiveSSRC; receiveSSRCs = newReceiveSSRCs; } @@ -2046,6 +2257,27 @@ public void addReceiveStreamListener(ReceiveStreamListener listener) } } + /** + * Determines whether the list of synchronization source (SSRC) + * identifiers received by the associated <tt>StreamRTPManager</tt> + * contains a specific SSRC. + * + * @param receiveSSRC the SSRC to check whether it is contained in the + * list of SSRC received by the associated <tt>StreamRTPManager</tt> + * @return <tt>true</tt> if the specified <tt>receiveSSRC</tt> is + * contained in the list of SSRC received by the associated + * <tt>StreamRTPManager</tt>; otherwise, <tt>false</tt> + */ + public synchronized boolean containsReceiveSSRC(int receiveSSRC) + { + for (int i = 0; i < receiveSSRCs.length; i++) + { + if (receiveSSRCs[i] == receiveSSRC) + return true; + } + return false; + } + public Format getFormat(int payloadType) { synchronized (formats) @@ -2091,14 +2323,6 @@ public ReceiveStreamListener[] getReceiveStreamListeners() } } - public synchronized boolean containsReceiveSSRC(long receiveSSRC) - { - for (int i = 0; i < receiveSSRCs.length; i++) - if (receiveSSRCs[i] == receiveSSRC) - return true; - return false; - } - public void removeReceiveStreamListener(ReceiveStreamListener listener) { synchronized (receiveStreamListeners) diff --git a/src/org/jitsi/impl/neomedia/device/VideoMediaDeviceSession.java b/src/org/jitsi/impl/neomedia/device/VideoMediaDeviceSession.java index 09339a7d..32d5f3bd 100644 --- a/src/org/jitsi/impl/neomedia/device/VideoMediaDeviceSession.java +++ b/src/org/jitsi/impl/neomedia/device/VideoMediaDeviceSession.java @@ -158,7 +158,7 @@ private static Component getVisualComponent(Player player) * A list with RTCPFeedbackCreateListener which will be notified when * a RTCPFeedbackListener is created. */ - private List<RTCPFeedbackCreateListener> rtcpFeedbackCreateListners + private List<RTCPFeedbackCreateListener> rtcpFeedbackCreateListeners = new LinkedList<RTCPFeedbackCreateListener>(); /** @@ -201,9 +201,9 @@ public VideoMediaDeviceSession(AbstractMediaDevice device) public void addRTCPFeedbackCreateListner( RTCPFeedbackCreateListener listener) { - synchronized (rtcpFeedbackCreateListners) + synchronized (rtcpFeedbackCreateListeners) { - rtcpFeedbackCreateListners.add(listener); + rtcpFeedbackCreateListeners.add(listener); } if (encoder != null) @@ -1411,9 +1411,9 @@ private void playerVisualComponentResized( public void removeRTCPFeedbackCreateListner( RTCPFeedbackCreateListener listener) { - synchronized (rtcpFeedbackCreateListners) + synchronized (rtcpFeedbackCreateListeners) { - rtcpFeedbackCreateListners.remove(listener); + rtcpFeedbackCreateListeners.remove(listener); } } @@ -1685,9 +1685,9 @@ protected Format setProcessorFormat( this.encoder = encoder; onRTCPFeedbackCreate(encoder); - synchronized (rtcpFeedbackCreateListners) + synchronized (rtcpFeedbackCreateListeners) { - for (RTCPFeedbackCreateListener l : rtcpFeedbackCreateListners) + for (RTCPFeedbackCreateListener l : rtcpFeedbackCreateListeners) l.onRTCPFeedbackCreate(encoder); } diff --git a/src/org/jitsi/impl/neomedia/transform/ControlTransformInputStream.java b/src/org/jitsi/impl/neomedia/transform/ControlTransformInputStream.java index f81747b7..cfe0125b 100644 --- a/src/org/jitsi/impl/neomedia/transform/ControlTransformInputStream.java +++ b/src/org/jitsi/impl/neomedia/transform/ControlTransformInputStream.java @@ -50,8 +50,11 @@ public void addRTCPFeedbackListener(RTCPFeedbackListener listener) { if (listener == null) throw new NullPointerException("listener"); - if(!listeners.contains(listener)) - listeners.add(listener); + synchronized (listeners) + { + if (!listeners.contains(listener)) + listeners.add(listener); + } } /** @@ -61,7 +64,13 @@ public void addRTCPFeedbackListener(RTCPFeedbackListener listener) */ public void removeRTCPFeedbackListener(RTCPFeedbackListener listener) { - listeners.remove(listener); + if (listener != null) + { + synchronized (listeners) + { + listeners.remove(listener); + } + } } /** diff --git a/src/org/jitsi/service/neomedia/event/RTCPFeedbackListener.java b/src/org/jitsi/service/neomedia/event/RTCPFeedbackListener.java index 7b726976..20516183 100644 --- a/src/org/jitsi/service/neomedia/event/RTCPFeedbackListener.java +++ b/src/org/jitsi/service/neomedia/event/RTCPFeedbackListener.java @@ -24,4 +24,3 @@ public interface RTCPFeedbackListener */ public void rtcpFeedbackReceived(RTCPFeedbackEvent event); } - -- GitLab