-
Lyubomir Marinov authoredLyubomir Marinov authored
RTPConnectorInputStream.java 12.94 KiB
/*
* Jitsi, the OpenSource Java VoIP and Instant Messaging client.
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jitsi.impl.neomedia;
import java.io.*;
import java.net.*;
import javax.media.protocol.*;
import org.ice4j.socket.*;
import org.jitsi.service.libjitsi.*;
import org.jitsi.service.packetlogging.*;
import org.jitsi.util.*;
/**
* @author Bing SU (nova.su@gmail.com)
* @author Lyubomir Marinov
*/
public abstract class RTPConnectorInputStream
implements PushSourceStream,
Runnable
{
/**
* The value of the property <tt>controls</tt> of
* <tt>RTPConnectorInputStream</tt> when there are no controls. Explicitly
* defined in order to reduce unnecessary allocations.
*/
private static final Object[] EMPTY_CONTROLS = new Object[0];
/**
* The length in bytes of the buffers of <tt>RTPConnectorInputStream</tt>
* receiving packets from the network.
*/
public static final int PACKET_RECEIVE_BUFFER_LENGTH = 4 * 1024;
/**
* Packet receive buffer
*/
private final byte[] buffer = new byte[PACKET_RECEIVE_BUFFER_LENGTH];
/**
* Whether this stream is closed. Used to control the termination of worker
* thread.
*/
protected boolean closed;
/**
* Caught an IO exception during read from socket
*/
protected boolean ioError = false;
/**
* The packet data to be read out of this instance through its
* {@link #read(byte[], int, int)} method.
*/
protected RawPacket pkt;
/**
* SourceTransferHandler object which is used to read packets.
*/
private SourceTransferHandler transferHandler;
/**
* The Thread receiving packets.
*/
protected Thread receiverThread = null;
/**
* The <tt>DatagramPacketFilter</tt>s which allow dropping
* <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s.
*/
private DatagramPacketFilter[] datagramPacketFilters;
/**
* Initializes a new <tt>RTPConnectorInputStream</tt> which is to receive
* packet data from a specific UDP socket.
*/
public RTPConnectorInputStream()
{
// PacketLoggingService
addDatagramPacketFilter(
new DatagramPacketFilter()
{
/**
* Used for debugging. As we don't log every packet, we must
* count them and decide which to log.
*/
private long numberOfPackets = 0;
public boolean accept(DatagramPacket p)
{
numberOfPackets++;
if (RTPConnectorOutputStream.logPacket(numberOfPackets))
{
PacketLoggingService packetLogging
= LibJitsi.getPacketLoggingService();
if ((packetLogging != null)
&& packetLogging.isLoggingEnabled(
PacketLoggingService.ProtocolName
.RTP))
doLogPacket(p);
}
return true;
}
});
}
/**
* Close this stream, stops the worker thread.
*/
public synchronized void close()
{
}
/**
* Creates a new <tt>RawPacket</tt> from a specific <tt>DatagramPacket</tt>
* in order to have this instance receive its packet data through its
* {@link #read(byte[], int, int)} method. Allows extenders to intercept the
* packet data and possibly filter and/or modify it.
*
* @param datagramPacket the <tt>DatagramPacket</tt> containing the packet
* data
* @return a new <tt>RawPacket</tt> containing the packet data of the
* specified <tt>DatagramPacket</tt> or possibly its modification;
* <tt>null</tt> to ignore the packet data of the specified
* <tt>DatagramPacket</tt> and not make it available to this instance
* through its {@link #read(byte[], int, int)} method
*/
protected RawPacket createRawPacket(DatagramPacket datagramPacket)
{
if (pkt == null)
{
return
new RawPacket(
datagramPacket.getData(),
datagramPacket.getOffset(),
datagramPacket.getLength());
}
pkt.setBuffer(datagramPacket.getData());
pkt.setLength(datagramPacket.getLength());
pkt.setOffset(datagramPacket.getOffset());
return pkt;
}
/**
* Provides a dummy implementation to {@link
* RTPConnectorInputStream#endOfStream()} that always returns
* <tt>false</tt>.
*
* @return <tt>false</tt>, no matter what.
*/
public boolean endOfStream()
{
return false;
}
/**
* Provides a dummy implementation to {@link
* RTPConnectorInputStream#getContentDescriptor()} that always returns
* <tt>null</tt>.
*
* @return <tt>null</tt>, no matter what.
*/
public ContentDescriptor getContentDescriptor()
{
return null;
}
/**
* Provides a dummy implementation to {@link
* RTPConnectorInputStream#getContentLength()} that always returns
* <tt>LENGTH_UNKNOWN</tt>.
*
* @return <tt>LENGTH_UNKNOWN</tt>, no matter what.
*/
public long getContentLength()
{
return pkt.getLength();
}
/**
* Provides a dummy implementation to {@link
* RTPConnectorInputStream#getControl(String)} that always returns
* <tt>null</tt>.
*
* @param controlType ignored.
*
* @return <tt>null</tt>, no matter what.
*/
public Object getControl(String controlType)
{
return null;
}
/**
* Provides a dummy implementation to {@link
* RTPConnectorInputStream#getControls()} that always returns
* <tt>EMPTY_CONTROLS</tt>.
*
* @return <tt>EMPTY_CONTROLS</tt>, no matter what.
*/
public Object[] getControls()
{
return EMPTY_CONTROLS;
}
/**
* Provides a dummy implementation to {@link
* RTPConnectorInputStream#getMinimumTransferSize()} that always returns
* <tt>2 * 1024</tt>.
*
* @return <tt>2 * 1024</tt>, no matter what.
*/
public int getMinimumTransferSize()
{
return 2 * 1024; // twice the MTU size, just to be safe.
}
/**
* Copies the content of the most recently received packet into
* <tt>buffer</tt>.
*
* @param buffer the <tt>byte[]</tt> that we'd like to copy the content of
* the packet to.
* @param offset the position where we are supposed to start writing in
* <tt>buffer</tt>.
* @param length the number of <tt>byte</tt>s available for writing in
* <tt>buffer</tt>.
*
* @return the number of bytes read
*
* @throws IOException if <tt>length</tt> is less than the size of the
* packet.
*/
public int read(byte[] buffer, int offset, int length)
throws IOException
{
if (ioError)
return -1;
int pktLength = pkt.getLength();
if (length < pktLength)
{
throw new IOException(
"Input buffer not big enough for " + pktLength);
}
System.arraycopy(
pkt.getBuffer(), pkt.getOffset(),
buffer, offset,
pktLength);
return pktLength;
}
/**
* Log the packet.
*
* @param packet packet to log
*/
protected abstract void doLogPacket(DatagramPacket packet);
/**
* Receive packet.
*
* @param p packet for receiving
* @throws IOException if something goes wrong during receiving
*/
protected abstract void receivePacket(DatagramPacket p)
throws IOException;
/**
* Listens for incoming datagrams, stores them for reading by the
* <tt>read</tt> method and notifies the local <tt>transferHandler</tt>
* that there's data to be read.
*/
public void run()
{
DatagramPacket p
= new DatagramPacket(buffer, 0, PACKET_RECEIVE_BUFFER_LENGTH);
while (!closed)
{
try
{
// http://code.google.com/p/android/issues/detail?id=24765
if (OSUtils.IS_ANDROID)
p.setLength(PACKET_RECEIVE_BUFFER_LENGTH);
receivePacket(p);
}
catch (IOException e)
{
ioError = true;
break;
}
/*
* Do the DatagramPacketFilters accept the received DatagramPacket?
*/
DatagramPacketFilter[] datagramPacketFilters
= getDatagramPacketFilters();
boolean accept;
if (datagramPacketFilters == null)
accept = true;
else
{
accept = true;
for (int i = 0; i < datagramPacketFilters.length; i++)
{
try
{
if (!datagramPacketFilters[i].accept(p))
{
accept = false;
break;
}
}
catch (Throwable t)
{
if (t instanceof ThreadDeath)
throw (ThreadDeath) t;
}
}
}
if (accept)
{
pkt = createRawPacket(p);
/*
* If we got extended, the delivery of the packet may have been
* canceled.
*/
if ((pkt != null) && (!pkt.isInvalid())
&& (transferHandler != null) && !closed)
transferHandler.transferData(this);
}
}
}
/**
* Sets the <tt>transferHandler</tt> that this connector should be notifying
* when new data is available for reading.
*
* @param transferHandler the <tt>transferHandler</tt> that this connector
* should be notifying when new data is available for reading.
*/
public void setTransferHandler(SourceTransferHandler transferHandler)
{
if (!closed)
this.transferHandler = transferHandler;
}
/**
* Changes current thread priority.
* @param priority the new priority.
*/
public void setPriority(int priority)
{
// currently no priority is set
// if (receiverThread != null)
// receiverThread.setPriority(priority);
}
/**
* Gets the <tt>DatagramPacketFilter</tt>s which allow dropping
* <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s.
*
* @return the <tt>DatagramPacketFilter</tt>s which allow dropping
* <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s.
*/
public synchronized DatagramPacketFilter[] getDatagramPacketFilters()
{
return datagramPacketFilters;
}
/**
* Adds a <tt>DatagramPacketFilter</tt> which allows dropping
* <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s.
*
* @param datagramPacketFilter the <tt>DatagramPacketFilter</tt> which
* allows dropping <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s
*/
public synchronized void addDatagramPacketFilter(
DatagramPacketFilter datagramPacketFilter)
{
if (datagramPacketFilter == null)
throw new NullPointerException("datagramPacketFilter");
if (datagramPacketFilters == null)
{
datagramPacketFilters
= new DatagramPacketFilter[] { datagramPacketFilter };
}
else
{
final int length = datagramPacketFilters.length;
for (int i = 0; i < length; i++)
if (datagramPacketFilter.equals(datagramPacketFilters[i]))
return;
DatagramPacketFilter[] newDatagramPacketFilters
= new DatagramPacketFilter[length + 1];
System.arraycopy(
datagramPacketFilters, 0,
newDatagramPacketFilters, 0,
length);
newDatagramPacketFilters[length] = datagramPacketFilter;
datagramPacketFilters = newDatagramPacketFilters;
}
}
}