Make stream push packets to sinks

Now that decoder and recorder implement the packet sink trait, make
stream push packets to the sinks without depending on the concrete sink
types.
This commit is contained in:
Romain Vimont
2021-04-11 15:01:05 +02:00
parent cbed38799e
commit f7a1b67d66
7 changed files with 82 additions and 65 deletions

View File

@@ -66,25 +66,11 @@ notify_stopped(void) {
}
static bool
process_config_packet(struct stream *stream, AVPacket *packet) {
if (stream->recorder && !recorder_push(stream->recorder, packet)) {
LOGE("Could not send config packet to recorder");
return false;
}
return true;
}
static bool
process_frame(struct stream *stream, AVPacket *packet) {
if (stream->decoder && !decoder_push(stream->decoder, packet)) {
return false;
}
if (stream->recorder) {
packet->dts = packet->pts;
if (!recorder_push(stream->recorder, packet)) {
LOGE("Could not send packet to recorder");
push_packet_to_sinks(struct stream *stream, const AVPacket *packet) {
for (unsigned i = 0; i < stream->sink_count; ++i) {
struct sc_packet_sink *sink = stream->sinks[i];
if (!sink->ops->push(sink, packet)) {
LOGE("Could not send config packet to sink %d", i);
return false;
}
}
@@ -111,9 +97,11 @@ stream_parse(struct stream *stream, AVPacket *packet) {
packet->flags |= AV_PKT_FLAG_KEY;
}
bool ok = process_frame(stream, packet);
packet->dts = packet->pts;
bool ok = push_packet_to_sinks(stream, packet);
if (!ok) {
LOGE("Could not process frame");
LOGE("Could not process packet");
return false;
}
@@ -156,7 +144,7 @@ stream_push_packet(struct stream *stream, AVPacket *packet) {
if (is_config) {
// config packet
bool ok = process_config_packet(stream, packet);
bool ok = push_packet_to_sinks(stream, packet);
if (!ok) {
return false;
}
@@ -177,6 +165,33 @@ stream_push_packet(struct stream *stream, AVPacket *packet) {
return true;
}
static void
stream_close_first_sinks(struct stream *stream, unsigned count) {
while (count) {
struct sc_packet_sink *sink = stream->sinks[--count];
sink->ops->close(sink);
}
}
static inline void
stream_close_sinks(struct stream *stream) {
stream_close_first_sinks(stream, stream->sink_count);
}
static bool
stream_open_sinks(struct stream *stream, const AVCodec *codec) {
for (unsigned i = 0; i < stream->sink_count; ++i) {
struct sc_packet_sink *sink = stream->sinks[i];
if (!sink->ops->open(sink, codec)) {
LOGE("Could not open packet sink %d", i);
stream_close_first_sinks(stream, i);
return false;
}
}
return true;
}
static int
run_stream(void *data) {
struct stream *stream = data;
@@ -193,22 +208,15 @@ run_stream(void *data) {
goto end;
}
if (stream->decoder && !decoder_open(stream->decoder, codec)) {
LOGE("Could not open decoder");
if (!stream_open_sinks(stream, codec)) {
LOGE("Could not open stream sinks");
goto finally_free_codec_ctx;
}
if (stream->recorder) {
if (!recorder_open(stream->recorder, codec)) {
LOGE("Could not open recorder");
goto finally_close_decoder;
}
}
stream->parser = av_parser_init(AV_CODEC_ID_H264);
if (!stream->parser) {
LOGE("Could not initialize parser");
goto finally_close_recorder;
goto finally_close_sinks;
}
// We must only pass complete frames to av_parser_parse2()!
@@ -238,14 +246,8 @@ run_stream(void *data) {
}
av_parser_close(stream->parser);
finally_close_recorder:
if (stream->recorder) {
recorder_close(stream->recorder);
}
finally_close_decoder:
if (stream->decoder) {
decoder_close(stream->decoder);
}
finally_close_sinks:
stream_close_sinks(stream);
finally_free_codec_ctx:
avcodec_free_context(&stream->codec_ctx);
end:
@@ -254,12 +256,18 @@ end:
}
void
stream_init(struct stream *stream, socket_t socket,
struct decoder *decoder, struct recorder *recorder) {
stream_init(struct stream *stream, socket_t socket) {
stream->socket = socket;
stream->decoder = decoder,
stream->recorder = recorder;
stream->has_pending = false;
stream->sink_count = 0;
}
void
stream_add_sink(struct stream *stream, struct sc_packet_sink *sink) {
assert(stream->sink_count < STREAM_MAX_SINKS);
assert(sink);
assert(sink->ops);
stream->sinks[stream->sink_count++] = sink;
}
bool