Skip to content
Snippets Groups Projects
RTPTranslatorImpl.java 56.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
     * Jitsi, the OpenSource Java VoIP and Instant Messaging client.
     *
     * Distributable under LGPL license.
     * See terms of license at gnu.org.
     */
    package org.jitsi.impl.neomedia;
    
    import java.io.*;
    import java.lang.reflect.*;
    import java.util.*;
    
    import javax.media.*;
    import javax.media.format.*;
    import javax.media.protocol.*;
    import javax.media.rtp.*;
    import javax.media.rtp.event.*;
    import javax.media.rtp.rtcp.*;
    
    
    import org.jitsi.impl.neomedia.protocol.*;
    
    import org.jitsi.service.neomedia.*;
    import org.jitsi.util.*;
    
    
    import com.sun.media.rtp.*;
    
    
    /**
     * Implements <tt>RTPTranslator</tt> which represents an RTP translator which
     * forwards RTP and RTCP traffic between multiple <tt>MediaStream</tt>s.
     *
     * @author Lyubomir Marinov
     */
    public class RTPTranslatorImpl
        implements ReceiveStreamListener,
                   RTPTranslator
    {
        /**
         * The <tt>Logger</tt> used by the <tt>RTPTranslatorImpl</tt> class and its
         * instances for logging output.
         */
        private static final Logger logger
            = Logger.getLogger(RTPTranslatorImpl.class);
    
    
        /**
         * The indicator which determines whether the method
         * {@link #createFakeSendStreamIfNecessary()} is to be executed by
         * <tt>RTPTranslatorImpl</tt>.
         */
        private static final boolean CREATE_FAKE_SEND_STREAM_IF_NECESSARY = false;
    
    
        /**
         * An array with <tt>long</tt> element type and no elements explicitly
         * defined to reduce unnecessary allocations.
         */
        private static final long[] EMPTY_LONG_ARRAY = new long[0];
    
        /**
         * The <tt>RTPConnector</tt> which is used by {@link #manager} and which
         * delegates to the <tt>RTPConnector</tt>s of the <tt>StreamRTPManager</tt>s
         * attached to this instance.
         */
        private RTPConnectorImpl connector;
    
    
        /**
         * The <tt>SendStream</tt> created by the <tt>RTPManager</tt> in order to
         * ensure that this <tt>RTPTranslatorImpl</tt> is able to disperse RTP and
         * RTCP received from remote peers even when the local peer is not
         * generating media to be transmitted.
         */
        private SendStream fakeSendStream;
    
    
        /**
         * The <tt>RTPManager</tt> which implements the actual RTP management of
         * this instance.
         */
        private final RTPManager manager = RTPManager.newInstance();
    
        /**
         * The <tt>SendStream</tt>s created by the <tt>RTPManager</tt> and the
         * <tt>StreamRTPManager</tt>-specific views to them.
         */
        private final List<SendStreamDesc> sendStreams
            = new LinkedList<SendStreamDesc>();
    
        /**
         * The list of <tt>StreamRTPManager</tt>s i.e. <tt>MediaStream</tt>s which
         * this instance forwards RTP and RTCP traffic between.
         */
        private final List<StreamRTPManagerDesc> streamRTPManagers
            = new ArrayList<StreamRTPManagerDesc>();
    
        /**
         * Initializes a new <tt>RTPTranslatorImpl</tt> instance.
         */
        public RTPTranslatorImpl()
        {
            manager.addReceiveStreamListener(this);
        }
    
        public synchronized void addFormat(
                StreamRTPManager streamRTPManager,
                Format format, int payloadType)
        {
            manager.addFormat(format, payloadType);
    
            getStreamRTPManagerDesc(streamRTPManager, true)
                .addFormat(format, payloadType);
        }
    
        public synchronized void addReceiveStreamListener(
                StreamRTPManager streamRTPManager,
                ReceiveStreamListener listener)
        {
            getStreamRTPManagerDesc(streamRTPManager, true)
                .addReceiveStreamListener(listener);
        }
    
        public void addRemoteListener(
                StreamRTPManager streamRTPManager,
                RemoteListener listener)
        {
            manager.addRemoteListener(listener);
        }
    
        public void addSendStreamListener(
                StreamRTPManager streamRTPManager,
                SendStreamListener listener)
        {
            // TODO Auto-generated method stub
        }
    
        public void addSessionListener(
                StreamRTPManager streamRTPManager,
                SessionListener listener)
        {
            // TODO Auto-generated method stub
        }
    
    
        /**
         * Closes {@link #fakeSendStream} if it exists and is considered no longer
         * necessary; otherwise, does nothing.
         */
        private synchronized void closeFakeSendStreamIfNotNecessary()
        {
            /*
             * If a SendStream has been created in response to a request from the
             * clients of this RTPTranslator implementation, the newly-created
             * SendStream in question will disperse the received RTP and RTCP from
             * remote peers so fakeSendStream will be obsolete.
             */
            try
            {
    
                if ((!sendStreams.isEmpty() || (streamRTPManagers.size() < 2))
                        && (fakeSendStream != null))
    
                    catch (NullPointerException npe)
                    {
                        /*
                         * Refer to MediaStreamImpl#stopSendStreams(
                         * Iterable<SendStream>, boolean) for an explanation about
                         * the swallowing of the exception.
                         */
                        logger.error("Failed to close fake send stream", npe);
                    }
    
                    finally
                    {
                        fakeSendStream = null;
                    }
                }
            }
            catch (Throwable t)
            {
                if (t instanceof ThreadDeath)
                    throw (ThreadDeath) t;
                else if (logger.isDebugEnabled())
                {
                    logger.debug(
                            "Failed to close the fake SendStream of this"
                                + " RTPTranslator.",
                            t);
                }
            }
        }
    
    
        private synchronized void closeSendStream(SendStreamDesc sendStreamDesc)
        {
            if (sendStreams.contains(sendStreamDesc)
                    && (sendStreamDesc.getSendStreamCount() < 1))
            {
    
                SendStream sendStream = sendStreamDesc.sendStream;
    
                try
                {
                    sendStream.close();
                }
                catch (NullPointerException npe)
                {
                    /*
                     * Refer to MediaStreamImpl#stopSendStreams(
                     * Iterable<SendStream>, boolean) for an explanation about the
                     * swallowing of the exception.
                     */
                    logger.error("Failed to close send stream", npe);
                }
    
                sendStreams.remove(sendStreamDesc);
            }
        }
    
    
        /**
         * Creates {@link #fakeSendStream} if it does not exist yet and is
         * considered necessary; otherwise, does nothing.
         */
    
        private synchronized void createFakeSendStreamIfNecessary()
        {
            /*
             * If no SendStream has been created in response to a request from the
             * clients of this RTPTranslator implementation, it will need
             * fakeSendStream in order to be able to disperse the received RTP and
             * RTCP from remote peers. Additionally, the fakeSendStream is not
             * necessary in the case of a single client of this RTPTranslator
             * because there is no other remote peer to disperse the received RTP
             * and RTCP to.
             */
    
            if (CREATE_FAKE_SEND_STREAM_IF_NECESSARY
                    && (fakeSendStream == null)
    
                    && sendStreams.isEmpty()
                    && (streamRTPManagers.size() > 1))
            {
                Format supportedFormat = null;
    
                for (StreamRTPManagerDesc s : streamRTPManagers)
                {
                    Format[] formats = s.getFormats();
    
                    if ((formats != null) && (formats.length > 0))
                    {
                        for (Format f : formats)
                        {
                            if (f != null)
                            {
                                supportedFormat = f;
                                break;
                            }
                        }
                        if (supportedFormat != null)
                            break;
                    }
                }
                if (supportedFormat != null)
                {
                    try
                    {
                        fakeSendStream
                            = manager.createSendStream(
                                    new FakePushBufferDataSource(supportedFormat),
                                    0);
                    }
                    catch (Throwable t)
                    {
                        if (t instanceof ThreadDeath)
                            throw (ThreadDeath) t;
                        else
                        {
                            logger.error(
                                    "Failed to create a fake SendStream to ensure"
                                        + " that this RTPTranslator is able to"
                                        + " disperse RTP and RTCP received from"
                                        + " remote peers even when the local peer"
                                        + " is not generating media to be"
                                        + " transmitted.",
                                    t);
                        }
                    }
                }
            }
        }
    
    
        public synchronized SendStream createSendStream(
                StreamRTPManager streamRTPManager,
                DataSource dataSource, int streamIndex)
            throws IOException,
                   UnsupportedFormatException
        {
            SendStreamDesc sendStreamDesc = null;
    
            for (SendStreamDesc s : sendStreams)
                if ((s.dataSource == dataSource) && (s.streamIndex == streamIndex))
                {
                    sendStreamDesc = s;
                    break;
                }
            if (sendStreamDesc == null)
            {
                SendStream sendStream
                    = manager.createSendStream(dataSource, streamIndex);
    
                if (sendStream != null)
                {
                    sendStreamDesc
                        = new SendStreamDesc(dataSource, streamIndex, sendStream);
                    sendStreams.add(sendStreamDesc);
    
                }
            }
            return
                (sendStreamDesc == null)
                    ? null
                    : sendStreamDesc.getSendStream(streamRTPManager, true);
        }
    
        /**
         * Releases the resources allocated by this instance in the course of its
         * execution and prepares it to be garbage collected.
         */
        public synchronized void dispose()
        {
            manager.removeReceiveStreamListener(this);
    
            try
            {
                manager.dispose();
            }
            catch (Throwable t)
            {
                if (t instanceof ThreadDeath)
                    throw (ThreadDeath) t;
                else
                {
                    /*
                     * RTPManager.dispose() often throws at least a
                     * NullPointerException in relation to some RTP BYE.
                     */
                    logger.error("Failed to dispose of RTPManager", t);
                }
            }
    
        }
    
        public synchronized void dispose(StreamRTPManager streamRTPManager)
        {
            Iterator<StreamRTPManagerDesc> streamRTPManagerIter
                = streamRTPManagers.iterator();
    
            while (streamRTPManagerIter.hasNext())
            {
                StreamRTPManagerDesc streamRTPManagerDesc
                    = streamRTPManagerIter.next();
    
                if (streamRTPManagerDesc.streamRTPManager == streamRTPManager)
                {
                    RTPConnectorDesc connectorDesc
                        = streamRTPManagerDesc.connectorDesc;
    
                    if (connectorDesc != null)
                    {
                        if (this.connector != null)
                            this.connector.removeConnector(connectorDesc);
    
                        streamRTPManagerDesc.connectorDesc = null;
                    }
    
                    streamRTPManagerIter.remove();
    
                    break;
                }
            }
        }
    
        private synchronized StreamRTPManagerDesc
            findStreamRTPManagerDescByReceiveSSRC(
                long receiveSSRC,
                StreamRTPManagerDesc exclusion)
        {
            for (int i = 0, count = streamRTPManagers.size(); i < count; i++)
            {
                StreamRTPManagerDesc s = streamRTPManagers.get(i);
    
                if ((s != exclusion) && s.containsReceiveSSRC(receiveSSRC))
                    return s;
            }
            return null;
        }
    
        public Object getControl(
                StreamRTPManager streamRTPManager,
                String controlType)
        {
            return manager.getControl(controlType);
        }
    
        public GlobalReceptionStats getGlobalReceptionStats(
                StreamRTPManager streamRTPManager)
        {
            return manager.getGlobalReceptionStats();
        }
    
        public GlobalTransmissionStats getGlobalTransmissionStats(
                StreamRTPManager streamRTPManager)
        {
            return manager.getGlobalTransmissionStats();
        }
    
        public long getLocalSSRC(StreamRTPManager streamRTPManager)
        {
            return ((RTPSessionMgr) manager).getLocalSSRC();
        }
    
        public synchronized Vector<ReceiveStream> getReceiveStreams(
                StreamRTPManager streamRTPManager)
        {
            StreamRTPManagerDesc streamRTPManagerDesc
                = getStreamRTPManagerDesc(streamRTPManager, false);
            Vector<ReceiveStream> receiveStreams = null;
    
            if (streamRTPManagerDesc != null)
            {
                Vector<?> managerReceiveStreams = manager.getReceiveStreams();
    
                if (managerReceiveStreams != null)
                {
                    receiveStreams
                        = new Vector<ReceiveStream>(managerReceiveStreams.size());
                    for (Object s : managerReceiveStreams)
                    {
                        ReceiveStream receiveStream = (ReceiveStream) s;
    
                        if (streamRTPManagerDesc.containsReceiveSSRC(
                                receiveStream.getSSRC()))
                            receiveStreams.add(receiveStream);
                    }
                }
            }
            return receiveStreams;
        }
    
        public synchronized Vector<SendStream> getSendStreams(
                StreamRTPManager streamRTPManager)
        {
            Vector<?> managerSendStreams = manager.getSendStreams();
            Vector<SendStream> sendStreams = null;
    
            if (managerSendStreams != null)
            {
                sendStreams = new Vector<SendStream>(managerSendStreams.size());
                for (SendStreamDesc sendStreamDesc : this.sendStreams)
                    if (managerSendStreams.contains(sendStreamDesc.sendStream))
                    {
                        SendStream sendStream
                            = sendStreamDesc.getSendStream(streamRTPManager, false);
    
                        if (sendStream != null)
                            sendStreams.add(sendStream);
                    }
            }
            return sendStreams;
        }
    
        private synchronized StreamRTPManagerDesc getStreamRTPManagerDesc(
                StreamRTPManager streamRTPManager,
                boolean create)
        {
            for (StreamRTPManagerDesc s : streamRTPManagers)
                if (s.streamRTPManager == streamRTPManager)
                    return s;
    
            StreamRTPManagerDesc s;
    
            if (create)
            {
                s = new StreamRTPManagerDesc(streamRTPManager);
                streamRTPManagers.add(s);
            }
            else
                s = null;
            return s;
        }
    
        public synchronized void initialize(
                StreamRTPManager streamRTPManager,
                RTPConnector connector)
        {
            if (this.connector == null)
            {
                this.connector = new RTPConnectorImpl();
                manager.initialize(this.connector);
            }
    
            StreamRTPManagerDesc streamRTPManagerDesc
                = getStreamRTPManagerDesc(streamRTPManager, true);
            RTPConnectorDesc connectorDesc = streamRTPManagerDesc.connectorDesc;
    
            if ((connectorDesc == null) || (connectorDesc.connector != connector))
            {
                if (connectorDesc != null)
                    this.connector.removeConnector(connectorDesc);
                streamRTPManagerDesc.connectorDesc
                    = connectorDesc
                        = (connector == null)
                            ? null
                            : new RTPConnectorDesc(streamRTPManagerDesc, connector);
                if (connectorDesc != null)
                    this.connector.addConnector(connectorDesc);
            }
        }
    
    
        /**
         * Logs information about an RTCP packet using {@link #logger} for debugging
         * purposes.
         *
         * @param obj the object which is the source of the log request
         * @param methodName the name of the method on <tt>obj</tt> which is the
         * source of the log request
         * @param buffer the <tt>byte</tt>s which (possibly) represent an RTCP
         * packet to be logged for debugging purposes
         * @param offset the position within <tt>buffer</tt> at which the valid data
         * begins
         * @param length the number of bytes in <tt>buffer</tt> which constitute the
         * valid data 
         */
        private static void logRTCP(
                Object obj, String methodName,
                byte[] buffer, int offset, int length)
        {
    
            /*
             * Do the bytes in the specified buffer resemble (a header of) an RTCP
             * packet?
             */
    
            if (length > 8)
            {
                byte b0 = buffer[offset];
                int v = (b0 & 0xc0) >>> 6;
    
                if (v == 2)
                {
                    byte b1 = buffer[offset + 1];
                    int pt = b1 & 0xff;
    
                    if (pt == 203 /* BYE */)
                    {
                        int sc = b0 & 0x1f;
                        long ssrc = (sc > 0) ? readInt(buffer, offset + 4) : -1;
    
                        logger.trace(
    
                                obj.getClass().getName() + '.' + methodName
                                    + ": RTCP BYE v=" + v + "; pt=" + pt + "; ssrc="
                                    + ssrc + ';');
    
        /**
         * Notifies this instance that an RTP or RTCP packet has been received from
         * a peer represented by a specific <tt>PushSourceStreamDesc</tt>.
         *
         * @param streamDesc a <tt>PushSourceStreamDesc</tt> which identifies the
         * peer from which an RTP or RTCP packet has been received
         * @param buffer the buffer which contains the bytes of the received RTP or
         * RTCP packet
         * @param offset the zero-based index in <tt>buffer</tt> at which the bytes
         * of the received RTP or RTCP packet begin
         * @param length the number of bytes in <tt>buffer</tt> beginning at
         * <tt>offset</tt> which are allowed to be accessed
         * @param read the number of bytes in <tt>buffer</tt> beginning at
         * <tt>offset</tt> which represent the received RTP or RTCP packet
         * @return the number of bytes in <tt>buffer</tt> beginning at
         * <tt>offset</tt> which represent the received RTP or RTCP packet 
         * @throws IOException if an I/O error occurs while the method processes the
         * specified RTP or RTCP packet
         */
    
        private int read(
                PushSourceStreamDesc streamDesc,
                byte[] buffer, int offset, int length,
                int read)
            throws IOException
        {
            boolean data = streamDesc.data;
            StreamRTPManagerDesc streamRTPManagerDesc
                = streamDesc.connectorDesc.streamRTPManagerDesc;
            Format format = null;
    
            if (data)
            {
    
                /*
                 * Do the bytes in the specified buffer resemble (a header of) an
                 * RTP packet?
                 */
                if ((length >= 12)
                        && (/* v */ ((buffer[offset] & 0xc0) >>> 6) == 2))
    
                {
                    long ssrc = readInt(buffer, offset + 8);
    
                    if (!streamRTPManagerDesc.containsReceiveSSRC(ssrc))
                    {
                        if (findStreamRTPManagerDescByReceiveSSRC(
                                    ssrc,
                                    streamRTPManagerDesc)
                                == null)
                            streamRTPManagerDesc.addReceiveSSRC(ssrc);
                        else
                            return 0;
                    }
    
    
                    format = streamRTPManagerDesc.getFormat(pt);
    
            else if (logger.isTraceEnabled())
                logRTCP(this, "read", buffer, offset, read);
    
            OutputDataStreamImpl outputStream
                = data
                    ? connector.getDataOutputStream()
                    : connector.getControlOutputStream();
    
            if (outputStream != null)
            {
                outputStream.write(
                        buffer, offset, read,
                        format,
                        streamRTPManagerDesc);
            }
    
            return read;
        }
    
        /**
         * Reads an <tt>int</tt> from a specific <tt>byte</tt> buffer starting at a
         * specific <tt>offset</tt>. The implementation is the same as
         * {@link DataInputStream#readInt()}.
         *
         * @param buffer the <tt>byte</tt> buffer to read an <tt>int</tt> from
         * @param offset the zero-based offset in <tt>buffer</tt> to start reading
         * an <tt>int</tt> from
         * @return an <tt>int</tt> read from the specified <tt>buffer</tt> starting
         * at the specified <tt>offset</tt>
         */
    
        public static int readInt(byte[] buffer, int offset)
    
        {
            return
                ((buffer[offset++] & 0xff) << 24)
                    | ((buffer[offset++] & 0xff) << 16)
                    | ((buffer[offset++] & 0xff) << 8)
                    | (buffer[offset] & 0xff);
        }
    
        public synchronized void removeReceiveStreamListener(
                StreamRTPManager streamRTPManager,
                ReceiveStreamListener listener)
        {
            StreamRTPManagerDesc streamRTPManagerDesc
                = getStreamRTPManagerDesc(streamRTPManager, false);
    
            if (streamRTPManagerDesc != null)
                streamRTPManagerDesc.removeReceiveStreamListener(listener);
        }
    
        public void removeRemoteListener(
                StreamRTPManager streamRTPManager,
                RemoteListener listener)
        {
            manager.removeRemoteListener(listener);
        }
    
        public void removeSendStreamListener(
                StreamRTPManager streamRTPManager,
                SendStreamListener listener)
        {
            // TODO Auto-generated method stub
        }
    
        public void removeSessionListener(
                StreamRTPManager streamRTPManager,
                SessionListener listener)
        {
            // TODO Auto-generated method stub
        }
    
        /**
         * Notifies this <tt>ReceiveStreamListener</tt> about a specific event
         * related to a <tt>ReceiveStream</tt>.
         *
         * @param event a <tt>ReceiveStreamEvent</tt> which contains the specifics
         * of the event this <tt>ReceiveStreamListener</tt> is being notified about
         * @see ReceiveStreamListener#update(ReceiveStreamEvent)
         */
        public void update(ReceiveStreamEvent event)
        {
            /*
             * Because NullPointerException was seen during testing, be thorough
             * with the null checks.
             */
            if (event != null)
            {
                ReceiveStream receiveStream = event.getReceiveStream();
    
                if (receiveStream != null)
                {
                    StreamRTPManagerDesc streamRTPManagerDesc
                        = findStreamRTPManagerDescByReceiveSSRC(
                                receiveStream.getSSRC(),
                                null);
    
                    if (streamRTPManagerDesc != null)
                        for (ReceiveStreamListener listener
                                : streamRTPManagerDesc.getReceiveStreamListeners())
                            listener.update(event);
                }
            }
        }
    
        private static class OutputDataStreamDesc
        {
            public RTPConnectorDesc connectorDesc;
    
            public OutputDataStream stream;
    
            public OutputDataStreamDesc(
                    RTPConnectorDesc connectorDesc,
                    OutputDataStream stream)
            {
                this.connectorDesc = connectorDesc;
                this.stream = stream;
            }
        }
    
        private static class OutputDataStreamImpl
            implements OutputDataStream,
                       Runnable
        {
            private static final int WRITE_QUEUE_CAPACITY
                = RTPConnectorOutputStream
                    .MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY;
    
            private boolean closed;
    
            private final boolean data;
    
            private final List<OutputDataStreamDesc> streams
                = new ArrayList<OutputDataStreamDesc>();
    
            private final RTPTranslatorBuffer[] writeQueue
                = new RTPTranslatorBuffer[WRITE_QUEUE_CAPACITY];
    
            private int writeQueueHead;
    
            private int writeQueueLength;
    
            private Thread writeThread;
    
            public OutputDataStreamImpl(boolean data)
            {
                this.data = data;
            }
    
            public synchronized void addStream(
                    RTPConnectorDesc connectorDesc,
                    OutputDataStream stream)
            {
                for (OutputDataStreamDesc streamDesc : streams)
                    if ((streamDesc.connectorDesc == connectorDesc)
                            && (streamDesc.stream == stream))
                        return;
                streams.add(new OutputDataStreamDesc(connectorDesc, stream));
            }
    
            public synchronized void close()
            {
                closed = true;
                writeThread = null;
                notify();
            }
    
            private synchronized void createWriteThread()
            {
                writeThread = new Thread(this, getClass().getName());
                writeThread.setDaemon(true);
                writeThread.start();
            }
    
            private synchronized int doWrite(
                    byte[] buffer, int offset, int length,
                    Format format,
                    StreamRTPManagerDesc exclusion)
            {
                int write = 0;
    
                for (int streamIndex = 0, streamCount = streams.size();
                        streamIndex < streamCount;
                        streamIndex++)
                {
                    OutputDataStreamDesc streamDesc = streams.get(streamIndex);
                    StreamRTPManagerDesc streamRTPManagerDesc
                        = streamDesc.connectorDesc.streamRTPManagerDesc;
    
                    if (streamRTPManagerDesc != exclusion)
                    {
    
                                Integer payloadType
                                    = streamRTPManagerDesc.getPayloadType(format);
    
                                if ((payloadType == null) && (exclusion != null))
                                    payloadType = exclusion.getPayloadType(format);
                                if (payloadType != null)
                                {
                                    int payloadTypeByteIndex = offset + 1;
    
                                    buffer[payloadTypeByteIndex]
                                        = (byte)
                                            ((buffer[payloadTypeByteIndex] & 0x80)
                                                | (payloadType & 0x7f));
                                }
    
                        else if (logger.isTraceEnabled())
                        {
                            logRTCP(
                                    this, "doWrite",
                                    buffer, offset, length);
                        }
    
    
                        int streamWrite
                            = streamDesc.stream.write(buffer, offset, length);
    
                        if (write < streamWrite)
                            write = streamWrite;
                    }
                }
                return write;
            }
    
            public synchronized void removeStreams(RTPConnectorDesc connectorDesc)
            {
                Iterator<OutputDataStreamDesc> streamIter = streams.iterator();
    
                while (streamIter.hasNext())
                {
                    OutputDataStreamDesc streamDesc = streamIter.next();
    
                    if (streamDesc.connectorDesc == connectorDesc)
                        streamIter.remove();
                }
            }
    
            public void run()
            {
                try
                {
                    while (true)
                    {
                        int writeIndex;
                        byte[] buffer;
                        StreamRTPManagerDesc exclusion;
                        Format format;
                        int length;
    
                        synchronized (this)
                        {
                            if (closed
                                    || !Thread.currentThread().equals(writeThread))
                                break;
                            if (writeQueueLength < 1)
                            {
                                boolean interrupted = false;
    
                                try
                                {
                                    wait();
                                }
                                catch (InterruptedException ie)
                                {
                                    interrupted = true;
                                }
                                if (interrupted)
                                    Thread.currentThread().interrupt();
                                continue;
                            }
    
                            writeIndex = writeQueueHead;
    
                            RTPTranslatorBuffer write = writeQueue[writeIndex];
    
                            buffer = write.data;
                            write.data = null;
                            exclusion = write.exclusion;
                            write.exclusion = null;
                            format = write.format;
                            write.format = null;
                            length = write.length;
                            write.length = 0;
    
                            writeQueueHead++;
                            if (writeQueueHead >= writeQueue.length)
                                writeQueueHead = 0;
                            writeQueueLength--;
                        }
    
                        try
                        {
                            doWrite(buffer, 0, length, format, exclusion);
                        }
                        finally
                        {
                            synchronized (this)
                            {
                                RTPTranslatorBuffer write = writeQueue[writeIndex];
    
                                if ((write != null) && (write.data == null))
                                    write.data = buffer;
                            }
                        }
                    }
                }
                catch (Throwable t)
                {
                    logger.error("Failed to translate RTP packet", t);
                    if (t instanceof ThreadDeath)
                        throw (ThreadDeath) t;
                }
                finally
                {
                    synchronized (this)
                    {
                        if (Thread.currentThread().equals(writeThread))
                            writeThread = null;
                        if (!closed
                                && (writeThread == null)
                                && (writeQueueLength > 0))
                            createWriteThread();
                    }
                }
            }
    
            public int write(byte[] buffer, int offset, int length)
            {
                return doWrite(buffer, offset, length, null, null);
            }
    
            public synchronized void write(
                    byte[] buffer, int offset, int length,
                    Format format,
                    StreamRTPManagerDesc exclusion)
            {
                if (closed)
                    return;
    
                int writeIndex;
    
                if (writeQueueLength < writeQueue.length)
                {
                    writeIndex
                        = (writeQueueHead + writeQueueLength) % writeQueue.length;
                }
                else
                {
                    writeIndex = writeQueueHead;
                    writeQueueHead++;
                    if (writeQueueHead >= writeQueue.length)
                        writeQueueHead = 0;
                    writeQueueLength--;
                    logger.warn("Will not translate RTP packet.");
                }
    
                RTPTranslatorBuffer write
                    = writeQueue[writeIndex];
    
                if (write == null)
                    writeQueue[writeIndex] = write = new RTPTranslatorBuffer();
    
                byte[] data = write.data;
    
                if ((data == null) || (data.length < length))
                    write.data = data = new byte[length];
                System.arraycopy(buffer, offset, data, 0, length);
    
                write.exclusion = exclusion;
                write.format = format;
                write.length = length;
    
                writeQueueLength++;
    
                if (writeThread == null)
                    createWriteThread();
                else
                    notify();
            }
        }
    
        private static class PushSourceStreamDesc