Первоначально опубликовано здесь .
Я действительно хотел больше узнать о том, как работают потоковые передачи аудио/видео. Одна часть этого мира – RTP. Это определено в RFC 3550 , который я читал, но на самом деле я ничего не понимаю, пока не увижу, что они работают на меня, так что давайте сделаем это. Это то, что мы строим:
- ffmpeg генерирует поток RTP из аудиофайла
- Приемник RTP потребляет этот поток, создавая из него поток websocket и отправляя его на сервер websocket
- Сервер websocket потребляет поток и записывает его в файл
Код для этого эксперимента можно найти по адресу https://github.com/lucaspin/spring-replication-proxy .
Структура заголовка RTP
Вот структура заголовка RTP из RFC :
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |V=2|P|X| CC |M| PT | sequence number | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | timestamp | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | synchronization source (SSRC) identifier | +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ | contributing source (CSRC) identifiers | | .... | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Здесь следует отметить несколько моментов:
версия (V)
: первые два байта в пакете. Это всегда 2, по крайней мере, до тех пор, пока кто-нибудь не придумает более новую версию RTP. Пища для размышлений: поскольку для этого используются всего 2 бита, возможно ли существование только версии 3?количество источников вклада (CC)
: работает вместе с частью заголовка идентификаторов CSRC, определяя, сколько из них у нас будет в пакете. Поскольку для этого используется только 4 бита, максимальное количество идентификаторов CSRC, которые мы можем иметь, равно 15.тип полезной нагрузки (PT)
: формат, который использует носитель в этом пакете. Это имеет смысл только в контексте профиля. RFC 3551 определяет хороший пример одного из них. Профиль определяет, какие значения соответствуют каким форматам. Тип полезной нагрузки может быть статическим или динамическим. Для статического типа полезной нагрузки вам просто нужно знать, какой профиль использует отправитель, и все в порядке. Для динамического типа полезной нагрузки вам нужен профиль и что-то еще, чтобы знать формат. Обычно описание SDP – это “что-то еще”.порядковый номер
: отправитель присваивает порядковые номера всем пакетам перед их отправкой, чтобы получатель мог перекомпилировать медиапакеты в правильном порядке. Позже вы сможете услышать, почему это так важно.временная метка
: Зачем вам нужен порядковый номер и временная метка?Идентификатор SSRC
: источник синхронизации; уникальный идентификатор, используемый для различения медиапотоков, поступающих из разных источников.Идентификаторы CSRC
: способствующие источники; способ переводчиков RTP сохранять исходные источники, которые были смешаны в одном и том же медиапотоке.
/| P (заполнение),
Поля X (расширение) и
M (маркер) являются флагами, указывающими на изменения в структуре пакета, и сейчас они для нас не очень важны. Таким образом, заголовок RTP может быть определен следующим образом:
public class RTPPacket { private final int version; private final boolean padding; private final boolean extension; private final int contributingSourcesCount; private final boolean marker; private final int payloadType; private final int sequenceNumber; private final int timestamp; private final int synchronizationSourceId; private final byte[] payload; }
Синтаксический анализ заголовка
Если вы похожи на меня и не владеете свободно двоичным кодом, вам понадобятся небольшие напоминания о том, как манипулировать битами, чтобы получить нужную вам информацию.
Например, мы хотим получить версию пакета. Мы знаем, что это поле принимает первые два бита первого байта пакета. Но первый байт также содержит три других поля. Как же тогда мы получим только эту версию? Используя побитовые операции И и SHIFT .
Допустим, что первый байт равен 10010010
. Чтобы захватить только первые два крайних левых бита, нам нужно “стереть” остальные шесть битов. Чтобы сделать это, мы можем использовать операцию AND, где биты в том же положении, что и те, которые мы хотим стереть, равны 0:
10010010 AND 11000000 ------------ 10000000
Помните: добавление чего-либо с 0 дает вам 0. Теперь мы обнулили их, но нужные нам биты все еще находятся в крайнем левом положении. Теперь мы сдвигаем их вправо 6 раз:
10000000 -------- (1x) 01000000 (2x) 00100000 (3x) 00010000 (4x) 00001000 (5x) 00000100 (6x) 00000010
Теперь у нас есть 00000010
, или 2 в фантастической десятичной системе счисления. Имея в виду эту идею, давайте создадим метод parsePacket() для преобразования массива байтов в объект пакета RTP:
public static RTPPacket parsePacket(byte[] packet) { return RTPPacket.builder() .version((packet[0] & 0b11000000) >>> 6) .padding(((packet[0] & 0b00100000) >> 5) == 1) .extension(((packet[0] & 0b00010000) >> 4) == 1) .contributingSourcesCount(packet[0] & 0b00001111) .marker(((packet[1] & 0b10000000) >> 7) == 1) .payloadType(packet[1] & 0b01111111) .sequenceNumber(ByteBuffer.wrap(packet, 2, 2).getShort()) .timestamp(ByteBuffer.wrap(packet, 4, 4).getInt()) .synchronizationSourceId(ByteBuffer.wrap(packet, 8, 4).getInt()) .payload(Arrays.copyOfRange(packet, 12, packet.length)) .build(); }
Входящий приемник UDP
Поскольку мы заинтересованы только в понимании RTP, давайте позволим Spring позаботиться о сложности UDP для нас. Spring поддерживает TCP и UDP и вот как вы создаете с его помощью входящий приемник UDP на порту 11111:
IntegrationFlows.from(new UnicastReceivingChannelAdapter(11111)) .handle(new RTPMessageHandler(rtpManager)) .get();
Класс Обработчик сообщений RTP
расширяет Spring’s AbstractMessageHandler и выполняет всего две вещи:
- Анализирует пакет RTP
- Вперед это нашему RTPManager
public class RTPMessageHandler extends AbstractMessageHandler { private final RTPManager rtpManager; @Override protected void handleMessageInternal(Message> message) { RTPPacket packet = parsePacket((byte[]) message.getPayload()); rtpManager.onPacketReceived(packet); } }
RTPManager
отвечает за очень фундаментальную часть приемника RTP, использующего UDP: переупорядочение пакетов.
Изменение порядка пакетов
RTP может использоваться через TCP (менее распространенный) или UDP (более распространенный). Поскольку он не зависит от правильности заказа доставки, которая может быть обеспечена транспортным протоколом, ему нужен способ упорядочивания пакетов. И именно поэтому в заголовке RTP указан порядковый номер. Приемник RTP, который будет воспроизводить медиапоток, должен изменить порядок полученных пакетов, в противном случае носитель будет звучать/выглядеть не очень красиво. В нашем примере мы не будем воспроизводить его, но это сделает потребитель WebSocket, поэтому нам нужно изменить порядок пакетов для них.
RTPManager
будет отвечать за две вещи:
- Изменение порядка пакетов перед отправкой их через websocket
- Группировка входящих пакетов по идентификатору SSRC
public class RTPManager { private final MapsyncSources = new HashMap<>(); public synchronized void onPacketReceived(RTPPacket packet) { if (syncSources.containsKey(packet.getSynchronizationSourceId())) { SyncSourceStatus status = syncSources.get(packet.getSynchronizationSourceId()); synchronized (status.getLock()) { status.addPacket(packet); if (status.getPackets().size() > MAX_PACKETS_BEFORE_FLUSHING) { status.flush(); } } } else { syncSources.put(packet.getSynchronizationSourceId(), SyncSourceStatus.builder() .syncSourceId(packet.getSynchronizationSourceId()) .packets(new ArrayList<>(List.of(packet))) .webSocket(initializeSocket()) .lock(new Object()) .build()); } } }
Здесь я использую другой класс с именем Sync Source Status
, чтобы помочь мне с этими вещами:
static class SyncSourceStatus { private int syncSourceId; private Listpackets; private Socket webSocket; private final Object lock; public void addPacket(RTPPacket packet) { packets.add(packet); } public void flush() { packets = packets.stream().sorted().collect(Collectors.toList()); packets.forEach(packet -> webSocket.send(packet.getPayload())); packets = new ArrayList<>(); } }
Обратите внимание, что Статус источника синхронизации.flush()
сортирует пакеты перед отправкой их через websocket, поэтому нам нужно сделать наш RTPPacket
реализовать Сопоставимый
:
public class RTPPacket implements Comparable{ // Fields and getters @Override public int compareTo(RTPPacket o) { return Integer.compare(getSequenceNumber(), o.getSequenceNumber()); } }
Поднимая все вверх
Для нашего сервера и клиента WebSocket мы будем использовать socket.io . Наш сервер просто захватывает все, что поступает из потока websocket, и сохраняет это в файл:
const http = require('http').Server(); const fs = require('fs'); const io = require('socket.io')(http); io.on('connection', (socket) => { const wstream = fs.createWriteStream('/tmp/audio-from-ffmpeg'); socket.on('disconnect', () => { wstream.end(); }); socket.on('message', msg => { wstream.write(Buffer.from(msg)); }); }); http.listen(4010, () => { console.log('listening on *:4010'); });
И мы будем использовать ffmpeg для генерации потока RTP, который будет использоваться нашим приемником RTP:
ffmpeg \ -re \ -i media/pcm_s16le-44100hz-s16-10s.wav \ -c:a copy \ -f rtp \ "rtp://127.0.0.1:11111"
Как только команда ffmpeg завершится, мы сможем проверить аудио, записанное сервером в /tmp/audio-from-ffmpeg
, и убедиться, что оно воспроизводится хорошо.
Изменить порядок или нет чтобы изменить порядок
Чтобы проиллюстрировать, почему изменение порядка пакетов важно:
- Медиапоток реплицируется с изменением порядка пакетов: https://lucaspin.github.io/public/media/reordering-yes.ogg
- Медиапоток реплицируется без изменения порядка пакетов: https://lucaspin.github.io/public/media/reordering-no.ogg
Большая разница, да? Вот почему вам нужно поле порядкового номера, потому что получателю RTP нужен способ размещать пакеты, поступающие в том же порядке, в котором они были отправлены.
Это все на сегодня, спасибо за чтение!
Оригинал: “https://dev.to/lucaspin/building-an-rtp-proxy-using-spring-2g7j”