/* * Jitsi, the OpenSource Java VoIP and Instant Messaging client. * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jitsi.impl.neomedia; import java.io.*; import java.lang.reflect.*; import java.util.*; import javax.media.*; import javax.media.format.*; import javax.media.protocol.*; import javax.media.rtp.*; import javax.media.rtp.event.*; import javax.media.rtp.rtcp.*; import org.jitsi.impl.neomedia.protocol.*; import org.jitsi.service.neomedia.*; import org.jitsi.util.*; import com.sun.media.rtp.*; /** * Implements <tt>RTPTranslator</tt> which represents an RTP translator which * forwards RTP and RTCP traffic between multiple <tt>MediaStream</tt>s. * * @author Lyubomir Marinov */ public class RTPTranslatorImpl implements ReceiveStreamListener, RTPTranslator { /** * The <tt>Logger</tt> used by the <tt>RTPTranslatorImpl</tt> class and its * instances for logging output. */ private static final Logger logger = Logger.getLogger(RTPTranslatorImpl.class); /** * The indicator which determines whether the method * {@link #createFakeSendStreamIfNecessary()} is to be executed by * <tt>RTPTranslatorImpl</tt>. */ private static final boolean CREATE_FAKE_SEND_STREAM_IF_NECESSARY = false; /** * An array with <tt>long</tt> element type and no elements explicitly * defined to reduce unnecessary allocations. */ private static final long[] EMPTY_LONG_ARRAY = new long[0]; /** * The <tt>RTPConnector</tt> which is used by {@link #manager} and which * delegates to the <tt>RTPConnector</tt>s of the <tt>StreamRTPManager</tt>s * attached to this instance. */ private RTPConnectorImpl connector; /** * The <tt>SendStream</tt> created by the <tt>RTPManager</tt> in order to * ensure that this <tt>RTPTranslatorImpl</tt> is able to disperse RTP and * RTCP received from remote peers even when the local peer is not * generating media to be transmitted. */ private SendStream fakeSendStream; /** * The <tt>RTPManager</tt> which implements the actual RTP management of * this instance. */ private final RTPManager manager = RTPManager.newInstance(); /** * The <tt>SendStream</tt>s created by the <tt>RTPManager</tt> and the * <tt>StreamRTPManager</tt>-specific views to them. */ private final List<SendStreamDesc> sendStreams = new LinkedList<SendStreamDesc>(); /** * The list of <tt>StreamRTPManager</tt>s i.e. <tt>MediaStream</tt>s which * this instance forwards RTP and RTCP traffic between. */ private final List<StreamRTPManagerDesc> streamRTPManagers = new ArrayList<StreamRTPManagerDesc>(); /** * Initializes a new <tt>RTPTranslatorImpl</tt> instance. */ public RTPTranslatorImpl() { manager.addReceiveStreamListener(this); } public synchronized void addFormat( StreamRTPManager streamRTPManager, Format format, int payloadType) { manager.addFormat(format, payloadType); getStreamRTPManagerDesc(streamRTPManager, true) .addFormat(format, payloadType); } public synchronized void addReceiveStreamListener( StreamRTPManager streamRTPManager, ReceiveStreamListener listener) { getStreamRTPManagerDesc(streamRTPManager, true) .addReceiveStreamListener(listener); } public void addRemoteListener( StreamRTPManager streamRTPManager, RemoteListener listener) { manager.addRemoteListener(listener); } public void addSendStreamListener( StreamRTPManager streamRTPManager, SendStreamListener listener) { // TODO Auto-generated method stub } public void addSessionListener( StreamRTPManager streamRTPManager, SessionListener listener) { // TODO Auto-generated method stub } /** * Closes {@link #fakeSendStream} if it exists and is considered no longer * necessary; otherwise, does nothing. */ private synchronized void closeFakeSendStreamIfNotNecessary() { /* * If a SendStream has been created in response to a request from the * clients of this RTPTranslator implementation, the newly-created * SendStream in question will disperse the received RTP and RTCP from * remote peers so fakeSendStream will be obsolete. */ try { if ((!sendStreams.isEmpty() || (streamRTPManagers.size() < 2)) && (fakeSendStream != null)) { try { fakeSendStream.close(); } catch (NullPointerException npe) { /* * Refer to MediaStreamImpl#stopSendStreams( * Iterable<SendStream>, boolean) for an explanation about * the swallowing of the exception. */ logger.error("Failed to close fake send stream", npe); } finally { fakeSendStream = null; } } } catch (Throwable t) { if (t instanceof ThreadDeath) throw (ThreadDeath) t; else if (logger.isDebugEnabled()) { logger.debug( "Failed to close the fake SendStream of this" + " RTPTranslator.", t); } } } private synchronized void closeSendStream(SendStreamDesc sendStreamDesc) { if (sendStreams.contains(sendStreamDesc) && (sendStreamDesc.getSendStreamCount() < 1)) { SendStream sendStream = sendStreamDesc.sendStream; try { sendStream.close(); } catch (NullPointerException npe) { /* * Refer to MediaStreamImpl#stopSendStreams( * Iterable<SendStream>, boolean) for an explanation about the * swallowing of the exception. */ logger.error("Failed to close send stream", npe); } sendStreams.remove(sendStreamDesc); } } /** * Creates {@link #fakeSendStream} if it does not exist yet and is * considered necessary; otherwise, does nothing. */ private synchronized void createFakeSendStreamIfNecessary() { /* * If no SendStream has been created in response to a request from the * clients of this RTPTranslator implementation, it will need * fakeSendStream in order to be able to disperse the received RTP and * RTCP from remote peers. Additionally, the fakeSendStream is not * necessary in the case of a single client of this RTPTranslator * because there is no other remote peer to disperse the received RTP * and RTCP to. */ if (CREATE_FAKE_SEND_STREAM_IF_NECESSARY && (fakeSendStream == null) && sendStreams.isEmpty() && (streamRTPManagers.size() > 1)) { Format supportedFormat = null; for (StreamRTPManagerDesc s : streamRTPManagers) { Format[] formats = s.getFormats(); if ((formats != null) && (formats.length > 0)) { for (Format f : formats) { if (f != null) { supportedFormat = f; break; } } if (supportedFormat != null) break; } } if (supportedFormat != null) { try { fakeSendStream = manager.createSendStream( new FakePushBufferDataSource(supportedFormat), 0); } catch (Throwable t) { if (t instanceof ThreadDeath) throw (ThreadDeath) t; else { logger.error( "Failed to create a fake SendStream to ensure" + " that this RTPTranslator is able to" + " disperse RTP and RTCP received from" + " remote peers even when the local peer" + " is not generating media to be" + " transmitted.", t); } } } } } public synchronized SendStream createSendStream( StreamRTPManager streamRTPManager, DataSource dataSource, int streamIndex) throws IOException, UnsupportedFormatException { SendStreamDesc sendStreamDesc = null; for (SendStreamDesc s : sendStreams) if ((s.dataSource == dataSource) && (s.streamIndex == streamIndex)) { sendStreamDesc = s; break; } if (sendStreamDesc == null) { SendStream sendStream = manager.createSendStream(dataSource, streamIndex); if (sendStream != null) { sendStreamDesc = new SendStreamDesc(dataSource, streamIndex, sendStream); sendStreams.add(sendStreamDesc); closeFakeSendStreamIfNotNecessary(); } } return (sendStreamDesc == null) ? null : sendStreamDesc.getSendStream(streamRTPManager, true); } /** * Releases the resources allocated by this instance in the course of its * execution and prepares it to be garbage collected. */ public synchronized void dispose() { manager.removeReceiveStreamListener(this); try { manager.dispose(); } catch (Throwable t) { if (t instanceof ThreadDeath) throw (ThreadDeath) t; else { /* * RTPManager.dispose() often throws at least a * NullPointerException in relation to some RTP BYE. */ logger.error("Failed to dispose of RTPManager", t); } } } public synchronized void dispose(StreamRTPManager streamRTPManager) { Iterator<StreamRTPManagerDesc> streamRTPManagerIter = streamRTPManagers.iterator(); while (streamRTPManagerIter.hasNext()) { StreamRTPManagerDesc streamRTPManagerDesc = streamRTPManagerIter.next(); if (streamRTPManagerDesc.streamRTPManager == streamRTPManager) { RTPConnectorDesc connectorDesc = streamRTPManagerDesc.connectorDesc; if (connectorDesc != null) { if (this.connector != null) { this.connector.removeConnector(connectorDesc); connectorDesc.connector.close(); } streamRTPManagerDesc.connectorDesc = null; } streamRTPManagerIter.remove(); closeFakeSendStreamIfNotNecessary(); break; } } } private synchronized StreamRTPManagerDesc findStreamRTPManagerDescByReceiveSSRC( long receiveSSRC, StreamRTPManagerDesc exclusion) { for (int i = 0, count = streamRTPManagers.size(); i < count; i++) { StreamRTPManagerDesc s = streamRTPManagers.get(i); if ((s != exclusion) && s.containsReceiveSSRC(receiveSSRC)) return s; } return null; } public Object getControl( StreamRTPManager streamRTPManager, String controlType) { return manager.getControl(controlType); } public GlobalReceptionStats getGlobalReceptionStats( StreamRTPManager streamRTPManager) { return manager.getGlobalReceptionStats(); } public GlobalTransmissionStats getGlobalTransmissionStats( StreamRTPManager streamRTPManager) { return manager.getGlobalTransmissionStats(); } public long getLocalSSRC(StreamRTPManager streamRTPManager) { return ((RTPSessionMgr) manager).getLocalSSRC(); } public synchronized Vector<ReceiveStream> getReceiveStreams( StreamRTPManager streamRTPManager) { StreamRTPManagerDesc streamRTPManagerDesc = getStreamRTPManagerDesc(streamRTPManager, false); Vector<ReceiveStream> receiveStreams = null; if (streamRTPManagerDesc != null) { Vector<?> managerReceiveStreams = manager.getReceiveStreams(); if (managerReceiveStreams != null) { receiveStreams = new Vector<ReceiveStream>(managerReceiveStreams.size()); for (Object s : managerReceiveStreams) { ReceiveStream receiveStream = (ReceiveStream) s; if (streamRTPManagerDesc.containsReceiveSSRC( receiveStream.getSSRC())) receiveStreams.add(receiveStream); } } } return receiveStreams; } public synchronized Vector<SendStream> getSendStreams( StreamRTPManager streamRTPManager) { Vector<?> managerSendStreams = manager.getSendStreams(); Vector<SendStream> sendStreams = null; if (managerSendStreams != null) { sendStreams = new Vector<SendStream>(managerSendStreams.size()); for (SendStreamDesc sendStreamDesc : this.sendStreams) if (managerSendStreams.contains(sendStreamDesc.sendStream)) { SendStream sendStream = sendStreamDesc.getSendStream(streamRTPManager, false); if (sendStream != null) sendStreams.add(sendStream); } } return sendStreams; } private synchronized StreamRTPManagerDesc getStreamRTPManagerDesc( StreamRTPManager streamRTPManager, boolean create) { for (StreamRTPManagerDesc s : streamRTPManagers) if (s.streamRTPManager == streamRTPManager) return s; StreamRTPManagerDesc s; if (create) { s = new StreamRTPManagerDesc(streamRTPManager); streamRTPManagers.add(s); } else s = null; return s; } public synchronized void initialize( StreamRTPManager streamRTPManager, RTPConnector connector) { if (this.connector == null) { this.connector = new RTPConnectorImpl(); manager.initialize(this.connector); } StreamRTPManagerDesc streamRTPManagerDesc = getStreamRTPManagerDesc(streamRTPManager, true); RTPConnectorDesc connectorDesc = streamRTPManagerDesc.connectorDesc; if ((connectorDesc == null) || (connectorDesc.connector != connector)) { if (connectorDesc != null) this.connector.removeConnector(connectorDesc); streamRTPManagerDesc.connectorDesc = connectorDesc = (connector == null) ? null : new RTPConnectorDesc(streamRTPManagerDesc, connector); if (connectorDesc != null) this.connector.addConnector(connectorDesc); } } /** * Logs information about an RTCP packet using {@link #logger} for debugging * purposes. * * @param obj the object which is the source of the log request * @param methodName the name of the method on <tt>obj</tt> which is the * source of the log request * @param buffer the <tt>byte</tt>s which (possibly) represent an RTCP * packet to be logged for debugging purposes * @param offset the position within <tt>buffer</tt> at which the valid data * begins * @param length the number of bytes in <tt>buffer</tt> which constitute the * valid data */ private static void logRTCP( Object obj, String methodName, byte[] buffer, int offset, int length) { if (length > 8) { byte b0 = buffer[offset]; int v = (b0 & 0xc0) >>> 6; if (v == 2) { byte b1 = buffer[offset + 1]; int pt = b1 & 0xff; if (pt == 203 /* BYE */) { int sc = b0 & 0x1f; long ssrc = (sc > 0) ? readInt(buffer, offset + 4) : -1; logger.trace( obj.getClass().getName() + '.' + methodName + ": RTCP BYE v=" + v + "; pt=" + pt + "; ssrc=" + ssrc + ';'); } } } } private int read( PushSourceStreamDesc streamDesc, byte[] buffer, int offset, int length, int read) throws IOException { boolean data = streamDesc.data; StreamRTPManagerDesc streamRTPManagerDesc = streamDesc.connectorDesc.streamRTPManagerDesc; Format format = null; if (data) { if (length >= 12) { long ssrc = readInt(buffer, offset + 8); if (!streamRTPManagerDesc.containsReceiveSSRC(ssrc)) { if (findStreamRTPManagerDescByReceiveSSRC( ssrc, streamRTPManagerDesc) == null) streamRTPManagerDesc.addReceiveSSRC(ssrc); else return 0; } int payloadType = buffer[offset + 1] & 0x7f; format = streamRTPManagerDesc.getFormat(payloadType); } } else if (logger.isTraceEnabled()) logRTCP(this, "read", buffer, offset, read); createFakeSendStreamIfNecessary(); OutputDataStreamImpl outputStream = data ? connector.getDataOutputStream() : connector.getControlOutputStream(); if (outputStream != null) { outputStream.write( buffer, offset, read, format, streamRTPManagerDesc); } return read; } /** * Reads an <tt>int</tt> from a specific <tt>byte</tt> buffer starting at a * specific <tt>offset</tt>. The implementation is the same as * {@link DataInputStream#readInt()}. * * @param buffer the <tt>byte</tt> buffer to read an <tt>int</tt> from * @param offset the zero-based offset in <tt>buffer</tt> to start reading * an <tt>int</tt> from * @return an <tt>int</tt> read from the specified <tt>buffer</tt> starting * at the specified <tt>offset</tt> */ private static int readInt(byte[] buffer, int offset) { return ((buffer[offset++] & 0xff) << 24) | ((buffer[offset++] & 0xff) << 16) | ((buffer[offset++] & 0xff) << 8) | (buffer[offset] & 0xff); } public synchronized void removeReceiveStreamListener( StreamRTPManager streamRTPManager, ReceiveStreamListener listener) { StreamRTPManagerDesc streamRTPManagerDesc = getStreamRTPManagerDesc(streamRTPManager, false); if (streamRTPManagerDesc != null) streamRTPManagerDesc.removeReceiveStreamListener(listener); } public void removeRemoteListener( StreamRTPManager streamRTPManager, RemoteListener listener) { manager.removeRemoteListener(listener); } public void removeSendStreamListener( StreamRTPManager streamRTPManager, SendStreamListener listener) { // TODO Auto-generated method stub } public void removeSessionListener( StreamRTPManager streamRTPManager, SessionListener listener) { // TODO Auto-generated method stub } /** * Notifies this <tt>ReceiveStreamListener</tt> about a specific event * related to a <tt>ReceiveStream</tt>. * * @param event a <tt>ReceiveStreamEvent</tt> which contains the specifics * of the event this <tt>ReceiveStreamListener</tt> is being notified about * @see ReceiveStreamListener#update(ReceiveStreamEvent) */ public void update(ReceiveStreamEvent event) { /* * Because NullPointerException was seen during testing, be thorough * with the null checks. */ if (event != null) { ReceiveStream receiveStream = event.getReceiveStream(); if (receiveStream != null) { StreamRTPManagerDesc streamRTPManagerDesc = findStreamRTPManagerDescByReceiveSSRC( receiveStream.getSSRC(), null); if (streamRTPManagerDesc != null) for (ReceiveStreamListener listener : streamRTPManagerDesc.getReceiveStreamListeners()) listener.update(event); } } } private static class OutputDataStreamDesc { public RTPConnectorDesc connectorDesc; public OutputDataStream stream; public OutputDataStreamDesc( RTPConnectorDesc connectorDesc, OutputDataStream stream) { this.connectorDesc = connectorDesc; this.stream = stream; } } private static class OutputDataStreamImpl implements OutputDataStream, Runnable { private static final int WRITE_QUEUE_CAPACITY = RTPConnectorOutputStream .MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY; private boolean closed; private final boolean data; private final List<OutputDataStreamDesc> streams = new ArrayList<OutputDataStreamDesc>(); private final RTPTranslatorBuffer[] writeQueue = new RTPTranslatorBuffer[WRITE_QUEUE_CAPACITY]; private int writeQueueHead; private int writeQueueLength; private Thread writeThread; public OutputDataStreamImpl(boolean data) { this.data = data; } public synchronized void addStream( RTPConnectorDesc connectorDesc, OutputDataStream stream) { for (OutputDataStreamDesc streamDesc : streams) if ((streamDesc.connectorDesc == connectorDesc) && (streamDesc.stream == stream)) return; streams.add(new OutputDataStreamDesc(connectorDesc, stream)); } public synchronized void close() { closed = true; writeThread = null; notify(); } private synchronized void createWriteThread() { writeThread = new Thread(this, getClass().getName()); writeThread.setDaemon(true); writeThread.start(); } private synchronized int doWrite( byte[] buffer, int offset, int length, Format format, StreamRTPManagerDesc exclusion) { int write = 0; for (int streamIndex = 0, streamCount = streams.size(); streamIndex < streamCount; streamIndex++) { OutputDataStreamDesc streamDesc = streams.get(streamIndex); StreamRTPManagerDesc streamRTPManagerDesc = streamDesc.connectorDesc.streamRTPManagerDesc; if (streamRTPManagerDesc != exclusion) { if (data) { if ((format != null) && (length > 0)) { Integer payloadType = streamRTPManagerDesc.getPayloadType(format); if ((payloadType == null) && (exclusion != null)) payloadType = exclusion.getPayloadType(format); if (payloadType != null) { int payloadTypeByteIndex = offset + 1; buffer[payloadTypeByteIndex] = (byte) ((buffer[payloadTypeByteIndex] & 0x80) | (payloadType & 0x7f)); } } } else if (logger.isTraceEnabled()) { logRTCP( this, "doWrite", buffer, offset, length); } int streamWrite = streamDesc.stream.write(buffer, offset, length); if (write < streamWrite) write = streamWrite; } } return write; } public synchronized void removeStreams(RTPConnectorDesc connectorDesc) { Iterator<OutputDataStreamDesc> streamIter = streams.iterator(); while (streamIter.hasNext()) { OutputDataStreamDesc streamDesc = streamIter.next(); if (streamDesc.connectorDesc == connectorDesc) streamIter.remove(); } } public void run() { try { while (true) { int writeIndex; byte[] buffer; StreamRTPManagerDesc exclusion; Format format; int length; synchronized (this) { if (closed || !Thread.currentThread().equals(writeThread)) break; if (writeQueueLength < 1) { boolean interrupted = false; try { wait(); } catch (InterruptedException ie) { interrupted = true; } if (interrupted) Thread.currentThread().interrupt(); continue; } writeIndex = writeQueueHead; RTPTranslatorBuffer write = writeQueue[writeIndex]; buffer = write.data; write.data = null; exclusion = write.exclusion; write.exclusion = null; format = write.format; write.format = null; length = write.length; write.length = 0; writeQueueHead++; if (writeQueueHead >= writeQueue.length) writeQueueHead = 0; writeQueueLength--; } try { doWrite(buffer, 0, length, format, exclusion); } finally { synchronized (this) { RTPTranslatorBuffer write = writeQueue[writeIndex]; if ((write != null) && (write.data == null)) write.data = buffer; } } } } catch (Throwable t) { logger.error("Failed to translate RTP packet", t); if (t instanceof ThreadDeath) throw (ThreadDeath) t; } finally { synchronized (this) { if (Thread.currentThread().equals(writeThread)) writeThread = null; if (!closed && (writeThread == null) && (writeQueueLength > 0)) createWriteThread(); } } } public int write(byte[] buffer, int offset, int length) { return doWrite(buffer, offset, length, null, null); } public synchronized void write( byte[] buffer, int offset, int length, Format format, StreamRTPManagerDesc exclusion) { if (closed) return; int writeIndex; if (writeQueueLength < writeQueue.length) { writeIndex = (writeQueueHead + writeQueueLength) % writeQueue.length; } else { writeIndex = writeQueueHead; writeQueueHead++; if (writeQueueHead >= writeQueue.length) writeQueueHead = 0; writeQueueLength--; logger.warn("Will not translate RTP packet."); } RTPTranslatorBuffer write = writeQueue[writeIndex]; if (write == null) writeQueue[writeIndex] = write = new RTPTranslatorBuffer(); byte[] data = write.data; if ((data == null) || (data.length < length)) write.data = data = new byte[length]; System.arraycopy(buffer, offset, data, 0, length); write.exclusion = exclusion; write.format = format; write.length = length; writeQueueLength++; if (writeThread == null) createWriteThread(); else notify(); } } private static class PushSourceStreamDesc { public final RTPConnectorDesc connectorDesc; public final boolean data; public final PushSourceStream stream; public PushSourceStreamDesc( RTPConnectorDesc connectorDesc, PushSourceStream stream, boolean data) { this.connectorDesc = connectorDesc; this.stream = stream; this.data = data; } } private class PushSourceStreamImpl implements PushSourceStream, SourceTransferHandler { private final boolean data; private final List<PushSourceStreamDesc> streams = new LinkedList<PushSourceStreamDesc>(); private PushSourceStreamDesc streamToReadFrom; private SourceTransferHandler transferHandler; public PushSourceStreamImpl(boolean data) { this.data = data; } public synchronized void addStream( RTPConnectorDesc connectorDesc, PushSourceStream stream) { for (PushSourceStreamDesc streamDesc : streams) if ((streamDesc.connectorDesc == connectorDesc) && (streamDesc.stream == stream)) return; streams.add( new PushSourceStreamDesc(connectorDesc, stream, this.data)); stream.setTransferHandler(this); } public void close() { // TODO Auto-generated method stub } public boolean endOfStream() { // TODO Auto-generated method stub return false; } public ContentDescriptor getContentDescriptor() { // TODO Auto-generated method stub return null; } public long getContentLength() { return LENGTH_UNKNOWN; } public Object getControl(String controlType) { // TODO Auto-generated method stub return null; } public Object[] getControls() { // TODO Auto-generated method stub return null; } public synchronized int getMinimumTransferSize() { int minimumTransferSize = 0; for (PushSourceStreamDesc streamDesc : streams) { int streamMinimumTransferSize = streamDesc.stream.getMinimumTransferSize(); if (minimumTransferSize < streamMinimumTransferSize) minimumTransferSize = streamMinimumTransferSize; } return minimumTransferSize; } public int read(byte[] buffer, int offset, int length) throws IOException { PushSourceStreamDesc streamDesc; int read; synchronized (this) { streamDesc = streamToReadFrom; read = (streamDesc == null) ? 0 : streamDesc.stream.read(buffer, offset, length); } if (read > 0) { read = RTPTranslatorImpl.this.read( streamDesc, buffer, offset, length, read); } return read; } public synchronized void removeStreams(RTPConnectorDesc connectorDesc) { Iterator<PushSourceStreamDesc> streamIter = streams.iterator(); while (streamIter.hasNext()) { PushSourceStreamDesc streamDesc = streamIter.next(); if (streamDesc.connectorDesc == connectorDesc) { streamDesc.stream.setTransferHandler(null); streamIter.remove(); if (streamToReadFrom == streamDesc) streamToReadFrom = null; } } } public synchronized void setTransferHandler( SourceTransferHandler transferHandler) { if (this.transferHandler != transferHandler) { this.transferHandler = transferHandler; for (PushSourceStreamDesc streamDesc : streams) streamDesc.stream.setTransferHandler(this); } } public synchronized void transferData(PushSourceStream stream) { SourceTransferHandler transferHandler = null; for (PushSourceStreamDesc streamDesc : streams) if (streamDesc.stream == stream) { streamToReadFrom = streamDesc; transferHandler = this.transferHandler; break; } if (transferHandler != null) transferHandler.transferData(this); } } private static class RTPConnectorDesc { public final RTPConnector connector; public final StreamRTPManagerDesc streamRTPManagerDesc; public RTPConnectorDesc( StreamRTPManagerDesc streamRTPManagerDesc, RTPConnector connector) { this.streamRTPManagerDesc = streamRTPManagerDesc; this.connector = connector; } } /** * Implements the <tt>RTPConnector</tt> with which this instance initializes * its <tt>RTPManager</tt>. It delegates to the <tt>RTPConnector</tt> of the * various <tt>StreamRTPManager</tt>s. */ private class RTPConnectorImpl implements RTPConnector { /** * The <tt>RTPConnector</tt>s this instance delegates to. */ private final List<RTPConnectorDesc> connectors = new LinkedList<RTPConnectorDesc>(); private PushSourceStreamImpl controlInputStream; private OutputDataStreamImpl controlOutputStream; private PushSourceStreamImpl dataInputStream; private OutputDataStreamImpl dataOutputStream; public synchronized void addConnector(RTPConnectorDesc connector) { if (!connectors.contains(connector)) { connectors.add(connector); if (this.controlInputStream != null) { PushSourceStream controlInputStream = null; try { controlInputStream = connector.connector.getControlInputStream(); } catch (IOException ioe) { throw new UndeclaredThrowableException(ioe); } if (controlInputStream != null) { this.controlInputStream.addStream( connector, controlInputStream); } } if (this.controlOutputStream != null) { OutputDataStream controlOutputStream = null; try { controlOutputStream = connector.connector.getControlOutputStream(); } catch (IOException ioe) { throw new UndeclaredThrowableException(ioe); } if (controlOutputStream != null) { this.controlOutputStream.addStream( connector, controlOutputStream); } } if (this.dataInputStream != null) { PushSourceStream dataInputStream = null; try { dataInputStream = connector.connector.getDataInputStream(); } catch (IOException ioe) { throw new UndeclaredThrowableException(ioe); } if (dataInputStream != null) { this.dataInputStream.addStream( connector, dataInputStream); } } if (this.dataOutputStream != null) { OutputDataStream dataOutputStream = null; try { dataOutputStream = connector.connector.getDataOutputStream(); } catch (IOException ioe) { throw new UndeclaredThrowableException(ioe); } if (dataOutputStream != null) { this.dataOutputStream.addStream( connector, dataOutputStream); } } } } public synchronized void close() { if (controlInputStream != null) { controlInputStream.close(); controlInputStream = null; } if (controlOutputStream != null) { controlOutputStream.close(); controlOutputStream = null; } if (dataInputStream != null) { dataInputStream.close(); dataInputStream = null; } if (dataOutputStream != null) { dataOutputStream.close(); dataOutputStream = null; } for (RTPConnectorDesc connectorDesc : connectors) connectorDesc.connector.close(); } public synchronized PushSourceStream getControlInputStream() throws IOException { if (this.controlInputStream == null) { this.controlInputStream = new PushSourceStreamImpl(false); for (RTPConnectorDesc connectorDesc : connectors) { PushSourceStream controlInputStream = connectorDesc.connector.getControlInputStream(); if (controlInputStream != null) { this.controlInputStream.addStream( connectorDesc, controlInputStream); } } } return this.controlInputStream; } public synchronized OutputDataStreamImpl getControlOutputStream() throws IOException { if (this.controlOutputStream == null) { this.controlOutputStream = new OutputDataStreamImpl(false); for (RTPConnectorDesc connectorDesc : connectors) { OutputDataStream controlOutputStream = connectorDesc.connector.getControlOutputStream(); if (controlOutputStream != null) { this.controlOutputStream.addStream( connectorDesc, controlOutputStream); } } } return this.controlOutputStream; } public synchronized PushSourceStream getDataInputStream() throws IOException { if (this.dataInputStream == null) { this.dataInputStream = new PushSourceStreamImpl(true); for (RTPConnectorDesc connectorDesc : connectors) { PushSourceStream dataInputStream = connectorDesc.connector.getDataInputStream(); if (dataInputStream != null) { this.dataInputStream.addStream( connectorDesc, dataInputStream); } } } return this.dataInputStream; } public synchronized OutputDataStreamImpl getDataOutputStream() throws IOException { if (this.dataOutputStream == null) { this.dataOutputStream = new OutputDataStreamImpl(true); for (RTPConnectorDesc connectorDesc : connectors) { OutputDataStream dataOutputStream = connectorDesc.connector.getDataOutputStream(); if (dataOutputStream != null) { this.dataOutputStream.addStream( connectorDesc, dataOutputStream); } } } return this.dataOutputStream; } public int getReceiveBufferSize() { return -1; } public double getRTCPBandwidthFraction() { return -1; } public double getRTCPSenderBandwidthFraction() { return -1; } public int getSendBufferSize() { return -1; } public synchronized void removeConnector(RTPConnectorDesc connector) { if (connectors.contains(connector)) { if (controlInputStream != null) controlInputStream.removeStreams(connector); if (controlOutputStream != null) controlOutputStream.removeStreams(connector); if (dataInputStream != null) dataInputStream.removeStreams(connector); if (dataOutputStream != null) dataOutputStream.removeStreams(connector); connectors.remove(connector); } } public void setReceiveBufferSize(int receiveBufferSize) throws IOException { // TODO Auto-generated method stub } public void setSendBufferSize(int sendBufferSize) throws IOException { // TODO Auto-generated method stub } } private static class RTPTranslatorBuffer { public byte[] data; public StreamRTPManagerDesc exclusion; public Format format; public int length; } /** * Describes a <tt>SendStream</tt> created by the <tt>RTPManager</tt> of * this instance. Contains information about the <tt>DataSource</tt> and its * stream index from which the <tt>SendStream</tt> has been created so that * various <tt>StreamRTPManager</tt> receive different views of one and the * same <tt>SendStream</tt>. */ private class SendStreamDesc { /** * The <tt>DataSource</tt> from which {@link #sendStream} has been * created. */ public final DataSource dataSource; /** * The <tt>SendStream</tt> created from the stream of * {@link #dataSource} at index {@link #streamIndex}. */ public final SendStream sendStream; /** * The list of <tt>StreamRTPManager</tt>-specific views to * {@link #sendStream}. */ private final List<SendStreamImpl> sendStreams = new LinkedList<SendStreamImpl>(); /** * The number of <tt>StreamRTPManager</tt>s which have started their * views of {@link #sendStream}. */ private int started; /** * The index of the stream of {@link #dataSource} from which * {@link #sendStream} has been created. */ public final int streamIndex; public SendStreamDesc( DataSource dataSource, int streamIndex, SendStream sendStream) { this.dataSource = dataSource; this.sendStream = sendStream; this.streamIndex = streamIndex; } void close(SendStreamImpl sendStream) { boolean close = false; synchronized (this) { if (sendStreams.contains(sendStream)) { sendStreams.remove(sendStream); close = sendStreams.isEmpty(); } } if (close) RTPTranslatorImpl.this.closeSendStream(this); } public synchronized SendStreamImpl getSendStream( StreamRTPManager streamRTPManager, boolean create) { for (SendStreamImpl sendStream : sendStreams) if (sendStream.streamRTPManager == streamRTPManager) return sendStream; if (create) { SendStreamImpl sendStream = new SendStreamImpl(streamRTPManager, this); sendStreams.add(sendStream); return sendStream; } else return null; } public synchronized int getSendStreamCount() { return sendStreams.size(); } synchronized void start(SendStreamImpl sendStream) throws IOException { if (sendStreams.contains(sendStream)) { if (started < 1) { this.sendStream.start(); started = 1; } else started++; } } synchronized void stop(SendStreamImpl sendStream) throws IOException { if (sendStreams.contains(sendStream)) { if (started == 1) { this.sendStream.stop(); started = 0; } else if (started > 1) started--; } } } private static class SendStreamImpl implements SendStream { private boolean closed; public final SendStreamDesc sendStreamDesc; private boolean started; public final StreamRTPManager streamRTPManager; public SendStreamImpl( StreamRTPManager streamRTPManager, SendStreamDesc sendStreamDesc) { this.sendStreamDesc = sendStreamDesc; this.streamRTPManager = streamRTPManager; } public void close() { if (!closed) { try { if (started) stop(); } catch (IOException ioe) { throw new UndeclaredThrowableException(ioe); } finally { sendStreamDesc.close(this); closed = true; } } } public DataSource getDataSource() { return sendStreamDesc.sendStream.getDataSource(); } public Participant getParticipant() { return sendStreamDesc.sendStream.getParticipant(); } public SenderReport getSenderReport() { return sendStreamDesc.sendStream.getSenderReport(); } public TransmissionStats getSourceTransmissionStats() { return sendStreamDesc.sendStream.getSourceTransmissionStats(); } public long getSSRC() { return sendStreamDesc.sendStream.getSSRC(); } public int setBitRate(int bitRate) { // TODO Auto-generated method stub return 0; } public void setSourceDescription(SourceDescription[] sourceDescription) { // TODO Auto-generated method stub } public void start() throws IOException { if (closed) { throw new IOException( "Cannot start SendStream" + " after it has been closed."); } if (!started) { sendStreamDesc.start(this); started = true; } } public void stop() throws IOException { if (!closed && started) { sendStreamDesc.stop(this); started = false; } } } private static class StreamRTPManagerDesc { public RTPConnectorDesc connectorDesc; private final Map<Integer, Format> formats = new HashMap<Integer, Format>(); private long[] receiveSSRCs = EMPTY_LONG_ARRAY; private final List<ReceiveStreamListener> receiveStreamListeners = new LinkedList<ReceiveStreamListener>(); public final StreamRTPManager streamRTPManager; public StreamRTPManagerDesc(StreamRTPManager streamRTPManager) { this.streamRTPManager = streamRTPManager; } public void addFormat(Format format, int payloadType) { synchronized (formats) { formats.put(payloadType, format); } } public synchronized void addReceiveSSRC(long receiveSSRC) { if (!containsReceiveSSRC(receiveSSRC)) { int receiveSSRCCount = receiveSSRCs.length; long[] newReceiveSSRCs = new long[receiveSSRCCount + 1]; System.arraycopy( receiveSSRCs, 0, newReceiveSSRCs, 0, receiveSSRCCount); newReceiveSSRCs[receiveSSRCCount] = receiveSSRC; receiveSSRCs = newReceiveSSRCs; } } public void addReceiveStreamListener(ReceiveStreamListener listener) { synchronized (receiveStreamListeners) { if (!receiveStreamListeners.contains(listener)) receiveStreamListeners.add(listener); } } public Format getFormat(int payloadType) { synchronized (formats) { return formats.get(payloadType); } } public Format[] getFormats() { synchronized (this.formats) { Collection<Format> formats = this.formats.values(); return formats.toArray(new Format[formats.size()]); } } public Integer getPayloadType(Format format) { synchronized (formats) { for (Map.Entry<Integer, Format> entry : formats.entrySet()) { Format entryFormat = entry.getValue(); if (entryFormat.matches(format) || format.matches(entryFormat)) return entry.getKey(); } } return null; } public ReceiveStreamListener[] getReceiveStreamListeners() { synchronized (receiveStreamListeners) { return receiveStreamListeners.toArray( new ReceiveStreamListener[ receiveStreamListeners.size()]); } } public synchronized boolean containsReceiveSSRC(long receiveSSRC) { for (int i = 0; i < receiveSSRCs.length; i++) if (receiveSSRCs[i] == receiveSSRC) return true; return false; } public void removeReceiveStreamListener(ReceiveStreamListener listener) { synchronized (receiveStreamListeners) { receiveStreamListeners.remove(listener); } } } }