/* * Jitsi, the OpenSource Java VoIP and Instant Messaging client. * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jitsi.impl.neomedia; import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; import javax.media.rtp.*; import org.jitsi.service.libjitsi.*; import org.jitsi.service.packetlogging.*; import org.jitsi.util.*; /** * @author Bing SU (nova.su@gmail.com) * @author Lyubomir Marinov */ public abstract class RTPConnectorOutputStream implements OutputDataStream { /** * The <tt>Logger</tt> used by the <tt>RTPConnectorOutputStream</tt> class * and its instances for logging output. */ private static final Logger logger = Logger.getLogger(RTPConnectorOutputStream.class); /** * The maximum number of packets to be sent to be kept in the queue of * <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next * attempt to write a new packet in the queue will block until at least one * packet from the queue is sent. Defined in order to prevent * <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity * of the queue is unlimited. */ public static final int MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY = 256; /** * The functionality which allows this <tt>OutputDataStream</tt> to control * how many RTP packets it sends through its <tt>DatagramSocket</tt> per a * specific number of milliseconds. */ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy; /** * Stream targets' IP addresses and ports. */ protected final List<InetSocketAddress> targets = new LinkedList<InetSocketAddress>(); /** * The pool of <tt>RawPacket</tt> instances which reduces the number of * allocations performed by {@link #createRawPacket(byte[], int, int)}. */ private final LinkedBlockingQueue<RawPacket> rawPacketPool = new LinkedBlockingQueue<RawPacket>(); /** * Used for debugging. As we don't log every packet * we must count them and decide which to log. */ private long numberOfPackets = 0; /** * Initializes a new <tt>RTPConnectorOutputStream</tt> which is to send * packet data out through a specific socket. */ public RTPConnectorOutputStream() { } /** * Add a target to stream targets list * * @param remoteAddr target ip address * @param remotePort target port */ public void addTarget(InetAddress remoteAddr, int remotePort) { InetSocketAddress target = new InetSocketAddress(remoteAddr, remotePort); if (!targets.contains(target)) targets.add(target); } /** * Close this output stream. */ public void close() { if (maxPacketsPerMillisPolicy != null) { maxPacketsPerMillisPolicy.close(); maxPacketsPerMillisPolicy = null; } removeTargets(); } /** * Creates a new <tt>RawPacket</tt> from a specific <tt>byte[]</tt> buffer * in order to have this instance send its packet data through its * {@link #write(byte[], int, int)} method. Allows extenders to intercept * the packet data and possibly filter and/or modify it. * * @param buffer the packet data to be sent to the targets of this instance * @param offset the offset of the packet data in <tt>buffer</tt> * @param length the length of the packet data in <tt>buffer</tt> * @return a new <tt>RawPacket</tt> containing the packet data of the * specified <tt>byte[]</tt> buffer or possibly its modification; * <tt>null</tt> to ignore the packet data of the specified <tt>byte[]</tt> * buffer and not send it to the targets of this instance through its * {@link #write(byte[], int, int)} method */ protected RawPacket createRawPacket(byte[] buffer, int offset, int length) { RawPacket pkt = rawPacketPool.poll(); if ((pkt == null) || (pkt.getBuffer().length < length)) pkt = new RawPacket(new byte[length], 0, 0); System.arraycopy(buffer, offset, pkt.getBuffer(), 0, length); pkt.setLength(length); pkt.setOffset(0); return pkt; } /** * Remove a target from stream targets list * * @param remoteAddr target ip address * @param remotePort target port * @return <tt>true</tt> if the target is in stream target list and can be * removed; <tt>false</tt>, otherwise */ public boolean removeTarget(InetAddress remoteAddr, int remotePort) { for (Iterator<InetSocketAddress> targetIter = targets.iterator(); targetIter.hasNext();) { InetSocketAddress target = targetIter.next(); if (target.getAddress().equals(remoteAddr) && (target.getPort() == remotePort)) { targetIter.remove(); return true; } } return false; } /** * Remove all stream targets from this session. */ public void removeTargets() { targets.clear(); } /** * Determines whether a <tt>RawPacket</tt> which has a specific number in * the total number of sent <tt>RawPacket</tt>s is to be logged by * {@link PacketLoggingService}. * * @param numOfPacket the number of the <tt>RawPacket</tt> in the total * number of sent <tt>RawPacket</tt>s * @return <tt>true</tt> if the <tt>RawPacket</tt> with the specified * <tt>numOfPacket</tt> is to be logged by <tt>PacketLoggingService</tt>; * otherwise, <tt>false</tt> */ static boolean logPacket(long numOfPacket) { return (numOfPacket == 1) || (numOfPacket == 300) || (numOfPacket == 500) || (numOfPacket == 1000) || ((numOfPacket % 5000) == 0); } /** * Sends a specific <tt>RawPacket</tt> through this * <tt>OutputDataStream</tt> to a specific <tt>InetSocketAddress</tt>. * * @param packet the <tt>RawPacket</tt> to send through this * <tt>OutputDataStream</tt> to the specified <tt>target</tt> * @param target the <tt>InetSocketAddress</tt> to which the specified * <tt>packet</tt> is to be sent through this <tt>OutputDataStream</tt> * @throws IOException if anything goes wrong while sending the specified * <tt>packet</tt> through this <tt>OutputDataStream</tt> to the specified * <tt>target</tt> */ protected abstract void sendToTarget( RawPacket packet, InetSocketAddress target) throws IOException; /** * Logs a specific <tt>RawPacket</tt> associated with a specific remote * address. * * @param packet packet to log * @param target the remote address associated with the <tt>packet</tt> */ protected abstract void doLogPacket( RawPacket packet, InetSocketAddress target); /** * Returns whether or not this <tt>RTPConnectorOutputStream</tt> has a valid * socket. * * @return <tt>true</tt> if this <tt>RTPConnectorOutputStream</tt> has a * valid socket; <tt>false</tt>, otherwise */ protected abstract boolean isSocketValid(); /** * Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this * <tt>OutputDataSource</tt>. * * @param packet the RTP packet to be sent through the * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt> * @return <tt>true</tt> if the specified <tt>packet</tt> was successfully * sent; otherwise, <tt>false</tt> */ private boolean send(RawPacket packet) { if(!isSocketValid()) return false; numberOfPackets++; for (InetSocketAddress target : targets) { try { sendToTarget(packet, target); if(logPacket(numberOfPackets)) { PacketLoggingService packetLogging = LibJitsi.getPacketLoggingService(); if ((packetLogging != null) && packetLogging.isLoggingEnabled( PacketLoggingService.ProtocolName.RTP)) doLogPacket(packet, target); } } catch (IOException ioe) { rawPacketPool.offer(packet); // TODO error handling return false; } } rawPacketPool.offer(packet); return true; } /** * Sets the maximum number of RTP packets to be sent by this * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per * a specific number of milliseconds. * * @param maxPackets the maximum number of RTP packets to be sent by this * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the * specified number of milliseconds; <tt>-1</tt> if no maximum is to be set * @param perMillis the number of milliseconds per which <tt>maxPackets</tt> * are to be sent by this <tt>OutputDataStream</tt> through its * <tt>DatagramSocket</tt> */ public void setMaxPacketsPerMillis(int maxPackets, long perMillis) { if (maxPacketsPerMillisPolicy == null) { if (maxPackets > 0) { if (perMillis < 1) throw new IllegalArgumentException("perMillis"); maxPacketsPerMillisPolicy = new MaxPacketsPerMillisPolicy(maxPackets, perMillis); } } else { maxPacketsPerMillisPolicy .setMaxPacketsPerMillis(maxPackets, perMillis); } } /** * Implements {@link OutputDataStream#write(byte[], int, int)}. * * @param buffer the <tt>byte[]</tt> that we'd like to copy the content * of the packet to. * @param offset the position where we are supposed to start writing in * <tt>buffer</tt>. * @param length the number of <tt>byte</tt>s available for writing in * <tt>inBuffer</tt>. * * @return the number of bytes read */ public int write(byte[] buffer, int offset, int length) { /* * While calling write without targets can be carried out without a * problem, such a situation may be a symptom of a problem. For example, * it was discovered during testing that RTCP was seemingly-endlessly * sent after hanging up a call. */ if (logger.isDebugEnabled() && targets.isEmpty()) logger.debug("Write called without targets!", new Throwable()); RawPacket packet = createRawPacket(buffer, offset, length); /* * If we got extended, the delivery of the packet may have been * canceled. */ if (packet != null) { if (maxPacketsPerMillisPolicy == null) { if (!send(packet)) return -1; } else maxPacketsPerMillisPolicy.write(packet); } return length; } /** * Changes current thread priority. * @param priority the new priority. */ public void setPriority(int priority) { // currently no priority is set // if ((maxPacketsPerMillisPolicy != null) // && (maxPacketsPerMillisPolicy.sendThread != null)) // maxPacketsPerMillisPolicy.sendThread.setPriority(priority); } /** * Implements the functionality which allows this <tt>OutputDataStream</tt> * to control how many RTP packets it sends through its * <tt>DatagramSocket</tt> per a specific number of milliseconds. */ private class MaxPacketsPerMillisPolicy { /** * The maximum number of RTP packets to be sent by this * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per * {@link #perNanos} nanoseconds. */ private int maxPackets = -1; /** * The time stamp in nanoseconds of the start of the current * <tt>perNanos</tt> interval. */ private long millisStartTime = 0; /** * The list of RTP packets to be sent through the * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>. */ private final ArrayBlockingQueue<RawPacket> packetQueue = new ArrayBlockingQueue<RawPacket>( MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY); /** * The number of RTP packets already sent during the current * <tt>perNanos</tt> interval. */ private long packetsSentInMillis = 0; /** * The time interval in nanoseconds during which {@link #maxPackets} * number of RTP packets are to be sent through the * <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>. */ private long perNanos = -1; /** * The <tt>Thread</tt> which is to send the RTP packets in * {@link #packetQueue} through the <tt>DatagramSocket</tt> of this * <tt>OutputDataSource</tt>. */ private Thread sendThread; /** * To signal run or stop condition to send thread. */ private boolean sendRun = true; /** * Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which * is to control how many RTP packets this <tt>OutputDataSource</tt> is * to send through its <tt>DatagramSocket</tt> per a specific number of * milliseconds. * * @param maxPackets the maximum number of RTP packets to be sent per * <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt> * of this <tt>OutputDataStream</tt> * @param perMillis the number of milliseconds per which a maximum of * <tt>maxPackets</tt> RTP packets are to be sent through the * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt> */ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis) { setMaxPacketsPerMillis(maxPackets, perMillis); synchronized (this) { if (sendThread == null) { sendThread = new Thread(getClass().getName()) { @Override public void run() { runInSendThread(); } }; sendThread.setDaemon(true); sendThread.start(); } } } /** * Closes the connector. */ synchronized void close() { if (!sendRun) return; sendRun = false; // just offer a new packet to wakeup thread in case it waits for // a packet. packetQueue.offer(new RawPacket(null, 0, 0)); } /** * Sends the RTP packets in {@link #packetQueue} in accord with * {@link #maxPackets} and {@link #perNanos}. */ private void runInSendThread() { try { while (sendRun) { RawPacket packet = null; while (true) { try { packet = packetQueue.take(); break; } catch (InterruptedException iex) { continue; } } if (!sendRun) break; long time = System.nanoTime(); long millisRemainingTime = time - millisStartTime; if ((perNanos < 1) || (millisRemainingTime >= perNanos)) { millisStartTime = time; packetsSentInMillis = 0; } else if ((maxPackets > 0) && (packetsSentInMillis >= maxPackets)) { while (true) { millisRemainingTime = System.nanoTime() - millisStartTime; if (millisRemainingTime >= perNanos) break; LockSupport.parkNanos(millisRemainingTime); } millisStartTime = System.nanoTime(); packetsSentInMillis = 0; } send(packet); packetsSentInMillis++; } } finally { packetQueue.clear(); synchronized (packetQueue) { if (Thread.currentThread().equals(sendThread)) sendThread = null; } } } /** * Sets the maximum number of RTP packets to be sent by this * <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per * a specific number of milliseconds. * * @param maxPackets the maximum number of RTP packets to be sent by * this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> * per the specified number of milliseconds; <tt>-1</tt> if no maximum * is to be set * @param perMillis the number of milliseconds per which * <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt> * through its <tt>DatagramSocket</tt> */ public void setMaxPacketsPerMillis(int maxPackets, long perMillis) { if (maxPackets < 1) { this.maxPackets = -1; this.perNanos = -1; } else { if (perMillis < 1) throw new IllegalArgumentException("perMillis"); this.maxPackets = maxPackets; this.perNanos = perMillis * 1000000; } } /** * Queues a specific RTP packet to be sent through the * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>. * * @param packet the RTP packet to be queued for sending through the * <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt> */ public void write(RawPacket packet) { while (true) { try { packetQueue.put(packet); break; } catch (InterruptedException iex) { } } } } }