Files
scrcpy/app/src/stream.c
Romain Vimont 3ffea72fa6 Remove option --render-expired-frames
This flag forced the decoder to wait for the previous frame to be
consumed by the display.

It was initially implemented as a compilation flag for testing, not
intended to be exposed at runtime. But to remove ifdefs and to allow
users to test this flag easily, it had finally been exposed by commit
ebccb9f6cc.

In practice, it turned out to be useless: it had no practical impact,
and it did not solve or mitigate any performance issues causing frame
skipping.

But that added some complexity to the codebase: it required an
additional condition variable, and made video buffer calls possibly
blocking, which in turn required code to interrupt it on exit.

To prepare support for multiple sinks plugged to the decoder (display
and v4l2 for example), the blocking call used for pacing the decoder
output becomes unacceptable, so just remove this useless "feature".
2021-04-11 15:01:05 +02:00

292 lines
7.6 KiB
C

#include "stream.h"
#include <assert.h>
#include <libavformat/avformat.h>
#include <libavutil/time.h>
#include <SDL2/SDL_events.h>
#include <unistd.h>
#include "decoder.h"
#include "events.h"
#include "recorder.h"
#include "util/buffer_util.h"
#include "util/log.h"
#define BUFSIZE 0x10000
#define HEADER_SIZE 12
#define NO_PTS UINT64_C(-1)
static bool
stream_recv_packet(struct stream *stream, AVPacket *packet) {
// The video stream contains raw packets, without time information. When we
// record, we retrieve the timestamps separately, from a "meta" header
// added by the server before each raw packet.
//
// The "meta" header length is 12 bytes:
// [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ...
// <-------------> <-----> <-----------------------------...
// PTS packet raw packet
// size
//
// It is followed by <packet_size> bytes containing the packet/frame.
uint8_t header[HEADER_SIZE];
ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE);
if (r < HEADER_SIZE) {
return false;
}
uint64_t pts = buffer_read64be(header);
uint32_t len = buffer_read32be(&header[8]);
assert(pts == NO_PTS || (pts & 0x8000000000000000) == 0);
assert(len);
if (av_new_packet(packet, len)) {
LOGE("Could not allocate packet");
return false;
}
r = net_recv_all(stream->socket, packet->data, len);
if (r < 0 || ((uint32_t) r) < len) {
av_packet_unref(packet);
return false;
}
packet->pts = pts != NO_PTS ? (int64_t) pts : AV_NOPTS_VALUE;
return true;
}
static void
notify_stopped(void) {
SDL_Event stop_event;
stop_event.type = EVENT_STREAM_STOPPED;
SDL_PushEvent(&stop_event);
}
static bool
process_config_packet(struct stream *stream, AVPacket *packet) {
if (stream->recorder && !recorder_push(stream->recorder, packet)) {
LOGE("Could not send config packet to recorder");
return false;
}
return true;
}
static bool
process_frame(struct stream *stream, AVPacket *packet) {
if (stream->decoder && !decoder_push(stream->decoder, packet)) {
return false;
}
if (stream->recorder) {
packet->dts = packet->pts;
if (!recorder_push(stream->recorder, packet)) {
LOGE("Could not send packet to recorder");
return false;
}
}
return true;
}
static bool
stream_parse(struct stream *stream, AVPacket *packet) {
uint8_t *in_data = packet->data;
int in_len = packet->size;
uint8_t *out_data = NULL;
int out_len = 0;
int r = av_parser_parse2(stream->parser, stream->codec_ctx,
&out_data, &out_len, in_data, in_len,
AV_NOPTS_VALUE, AV_NOPTS_VALUE, -1);
// PARSER_FLAG_COMPLETE_FRAMES is set
assert(r == in_len);
(void) r;
assert(out_len == in_len);
if (stream->parser->key_frame == 1) {
packet->flags |= AV_PKT_FLAG_KEY;
}
bool ok = process_frame(stream, packet);
if (!ok) {
LOGE("Could not process frame");
return false;
}
return true;
}
static bool
stream_push_packet(struct stream *stream, AVPacket *packet) {
bool is_config = packet->pts == AV_NOPTS_VALUE;
// A config packet must not be decoded immetiately (it contains no
// frame); instead, it must be concatenated with the future data packet.
if (stream->has_pending || is_config) {
size_t offset;
if (stream->has_pending) {
offset = stream->pending.size;
if (av_grow_packet(&stream->pending, packet->size)) {
LOGE("Could not grow packet");
return false;
}
} else {
offset = 0;
if (av_new_packet(&stream->pending, packet->size)) {
LOGE("Could not create packet");
return false;
}
stream->has_pending = true;
}
memcpy(stream->pending.data + offset, packet->data, packet->size);
if (!is_config) {
// prepare the concat packet to send to the decoder
stream->pending.pts = packet->pts;
stream->pending.dts = packet->dts;
stream->pending.flags = packet->flags;
packet = &stream->pending;
}
}
if (is_config) {
// config packet
bool ok = process_config_packet(stream, packet);
if (!ok) {
return false;
}
} else {
// data packet
bool ok = stream_parse(stream, packet);
if (stream->has_pending) {
// the pending packet must be discarded (consumed or error)
stream->has_pending = false;
av_packet_unref(&stream->pending);
}
if (!ok) {
return false;
}
}
return true;
}
static int
run_stream(void *data) {
struct stream *stream = data;
AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264);
if (!codec) {
LOGE("H.264 decoder not found");
goto end;
}
stream->codec_ctx = avcodec_alloc_context3(codec);
if (!stream->codec_ctx) {
LOGC("Could not allocate codec context");
goto end;
}
if (stream->decoder && !decoder_open(stream->decoder, codec)) {
LOGE("Could not open decoder");
goto finally_free_codec_ctx;
}
if (stream->recorder) {
if (!recorder_open(stream->recorder, codec)) {
LOGE("Could not open recorder");
goto finally_close_decoder;
}
if (!recorder_start(stream->recorder)) {
LOGE("Could not start recorder");
goto finally_close_recorder;
}
}
stream->parser = av_parser_init(AV_CODEC_ID_H264);
if (!stream->parser) {
LOGE("Could not initialize parser");
goto finally_stop_and_join_recorder;
}
// We must only pass complete frames to av_parser_parse2()!
// It's more complicated, but this allows to reduce the latency by 1 frame!
stream->parser->flags |= PARSER_FLAG_COMPLETE_FRAMES;
for (;;) {
AVPacket packet;
bool ok = stream_recv_packet(stream, &packet);
if (!ok) {
// end of stream
break;
}
ok = stream_push_packet(stream, &packet);
av_packet_unref(&packet);
if (!ok) {
// cannot process packet (error already logged)
break;
}
}
LOGD("End of frames");
if (stream->has_pending) {
av_packet_unref(&stream->pending);
}
av_parser_close(stream->parser);
finally_stop_and_join_recorder:
if (stream->recorder) {
recorder_stop(stream->recorder);
LOGI("Finishing recording...");
recorder_join(stream->recorder);
}
finally_close_recorder:
if (stream->recorder) {
recorder_close(stream->recorder);
}
finally_close_decoder:
if (stream->decoder) {
decoder_close(stream->decoder);
}
finally_free_codec_ctx:
avcodec_free_context(&stream->codec_ctx);
end:
notify_stopped();
return 0;
}
void
stream_init(struct stream *stream, socket_t socket,
struct decoder *decoder, struct recorder *recorder) {
stream->socket = socket;
stream->decoder = decoder,
stream->recorder = recorder;
stream->has_pending = false;
}
bool
stream_start(struct stream *stream) {
LOGD("Starting stream thread");
bool ok = sc_thread_create(&stream->thread, run_stream, "stream", stream);
if (!ok) {
LOGC("Could not start stream thread");
return false;
}
return true;
}
void
stream_join(struct stream *stream) {
sc_thread_join(&stream->thread, NULL);
}