scuffle_ffmpeg/
filter_graph.rs

1use std::ffi::CString;
2use std::ptr::NonNull;
3
4use crate::error::{FfmpegError, FfmpegErrorCode};
5use crate::ffi::*;
6use crate::frame::GenericFrame;
7use crate::smart_object::SmartPtr;
8
9/// A filter graph. Used to chain filters together when transforming media data.
10pub struct FilterGraph(SmartPtr<AVFilterGraph>);
11
12/// Safety: `FilterGraph` is safe to send between threads.
13unsafe impl Send for FilterGraph {}
14
15impl FilterGraph {
16    /// Creates a new filter graph.
17    pub fn new() -> Result<Self, FfmpegError> {
18        // Safety: the pointer returned from avfilter_graph_alloc is valid
19        let ptr = unsafe { avfilter_graph_alloc() };
20        // Safety: The pointer here is valid.
21        unsafe { Self::wrap(ptr) }.ok_or(FfmpegError::Alloc)
22    }
23
24    /// Safety: `ptr` must be a valid pointer to an `AVFilterGraph`.
25    const unsafe fn wrap(ptr: *mut AVFilterGraph) -> Option<Self> {
26        let destructor = |ptr: &mut *mut AVFilterGraph| {
27            // Safety: The pointer here is valid.
28            unsafe { avfilter_graph_free(ptr) };
29        };
30
31        if ptr.is_null() {
32            return None;
33        }
34
35        // Safety: The pointer here is valid.
36        Some(Self(unsafe { SmartPtr::wrap(ptr, destructor) }))
37    }
38
39    /// Get the pointer to the filter graph.
40    pub const fn as_ptr(&self) -> *const AVFilterGraph {
41        self.0.as_ptr()
42    }
43
44    /// Get the mutable pointer to the filter graph.
45    pub const fn as_mut_ptr(&mut self) -> *mut AVFilterGraph {
46        self.0.as_mut_ptr()
47    }
48
49    /// Add a filter to the filter graph.
50    pub fn add(&mut self, filter: Filter, name: &str, args: &str) -> Result<FilterContext<'_>, FfmpegError> {
51        let name = CString::new(name).or(Err(FfmpegError::Arguments("name must be non-empty")))?;
52        let args = CString::new(args).or(Err(FfmpegError::Arguments("args must be non-empty")))?;
53
54        let mut filter_context = std::ptr::null_mut();
55
56        // Safety: avfilter_graph_create_filter is safe to call, 'filter_context' is a
57        // valid pointer
58        FfmpegErrorCode(unsafe {
59            avfilter_graph_create_filter(
60                &mut filter_context,
61                filter.as_ptr(),
62                name.as_ptr(),
63                args.as_ptr(),
64                std::ptr::null_mut(),
65                self.as_mut_ptr(),
66            )
67        })
68        .result()?;
69
70        // Safety: 'filter_context' is a valid pointer
71        Ok(FilterContext(unsafe {
72            NonNull::new(filter_context).ok_or(FfmpegError::Alloc)?.as_mut()
73        }))
74    }
75
76    /// Get a filter context by name.
77    pub fn get(&mut self, name: &str) -> Option<FilterContext<'_>> {
78        let name = CString::new(name).ok()?;
79
80        // Safety: avfilter_graph_get_filter is safe to call, and the returned pointer
81        // is valid
82        let mut ptr = NonNull::new(unsafe { avfilter_graph_get_filter(self.as_mut_ptr(), name.as_ptr()) })?;
83        // Safety: The pointer here is valid.
84        Some(FilterContext(unsafe { ptr.as_mut() }))
85    }
86
87    /// Validate the filter graph.
88    pub fn validate(&mut self) -> Result<(), FfmpegError> {
89        // Safety: avfilter_graph_config is safe to call
90        FfmpegErrorCode(unsafe { avfilter_graph_config(self.as_mut_ptr(), std::ptr::null_mut()) }).result()?;
91        Ok(())
92    }
93
94    /// Dump the filter graph to a string.
95    pub fn dump(&mut self) -> Option<String> {
96        // Safety: avfilter_graph_dump is safe to call
97        let dump = unsafe { avfilter_graph_dump(self.as_mut_ptr(), std::ptr::null_mut()) };
98        let destructor = |ptr: &mut *mut libc::c_char| {
99            // Safety: The pointer here is valid.
100            unsafe { av_free(*ptr as *mut libc::c_void) };
101            *ptr = std::ptr::null_mut();
102        };
103
104        // Safety: The pointer here is valid.
105        let c_str = unsafe { SmartPtr::wrap_non_null(dump, destructor)? };
106
107        // Safety: The pointer here is valid.
108        let c_str = unsafe { std::ffi::CStr::from_ptr(c_str.as_ptr()) };
109
110        Some(c_str.to_str().ok()?.to_owned())
111    }
112
113    /// Set the thread count for the filter graph.
114    pub const fn set_thread_count(&mut self, threads: i32) {
115        self.0.as_deref_mut_except().nb_threads = threads;
116    }
117
118    /// Add an input to the filter graph.
119    pub fn input(&mut self, name: &str, pad: i32) -> Result<FilterGraphParser<'_>, FfmpegError> {
120        FilterGraphParser::new(self).input(name, pad)
121    }
122
123    /// Add an output to the filter graph.
124    pub fn output(&mut self, name: &str, pad: i32) -> Result<FilterGraphParser<'_>, FfmpegError> {
125        FilterGraphParser::new(self).output(name, pad)
126    }
127}
128
129/// A parser for the filter graph. Allows you to create a filter graph from a string specification.
130pub struct FilterGraphParser<'a> {
131    graph: &'a mut FilterGraph,
132    inputs: SmartPtr<AVFilterInOut>,
133    outputs: SmartPtr<AVFilterInOut>,
134}
135
136/// Safety: `FilterGraphParser` is safe to send between threads.
137unsafe impl Send for FilterGraphParser<'_> {}
138
139impl<'a> FilterGraphParser<'a> {
140    /// Create a new `FilterGraphParser`.
141    fn new(graph: &'a mut FilterGraph) -> Self {
142        Self {
143            graph,
144            // Safety: 'avfilter_inout_free' is safe to call with a null pointer, and the pointer is valid
145            inputs: SmartPtr::null(|ptr| {
146                // Safety: The pointer here is valid.
147                unsafe { avfilter_inout_free(ptr) };
148            }),
149            // Safety: 'avfilter_inout_free' is safe to call with a null pointer, and the pointer is valid
150            outputs: SmartPtr::null(|ptr| {
151                // Safety: The pointer here is valid.
152                unsafe { avfilter_inout_free(ptr) };
153            }),
154        }
155    }
156
157    /// Add an input to the filter graph.
158    pub fn input(self, name: &str, pad: i32) -> Result<Self, FfmpegError> {
159        self.inout_impl(name, pad, false)
160    }
161
162    /// Add an output to the filter graph.
163    pub fn output(self, name: &str, pad: i32) -> Result<Self, FfmpegError> {
164        self.inout_impl(name, pad, true)
165    }
166
167    /// Parse the filter graph specification.
168    pub fn parse(mut self, spec: &str) -> Result<(), FfmpegError> {
169        let spec = CString::new(spec).unwrap();
170
171        // Safety: 'avfilter_graph_parse_ptr' is safe to call and all the pointers are
172        // valid.
173        FfmpegErrorCode(unsafe {
174            avfilter_graph_parse_ptr(
175                self.graph.as_mut_ptr(),
176                spec.as_ptr(),
177                self.inputs.as_mut(),
178                self.outputs.as_mut(),
179                std::ptr::null_mut(),
180            )
181        })
182        .result()?;
183
184        Ok(())
185    }
186
187    fn inout_impl(mut self, name: &str, pad: i32, output: bool) -> Result<Self, FfmpegError> {
188        let context = self.graph.get(name).ok_or(FfmpegError::Arguments("unknown name"))?;
189
190        let destructor = |ptr: &mut *mut AVFilterInOut| {
191            // Safety: The pointer here is valid allocated via `avfilter_inout_alloc`
192            unsafe { avfilter_inout_free(ptr) };
193        };
194
195        // Safety: `avfilter_inout_alloc` is safe to call.
196        let inout = unsafe { avfilter_inout_alloc() };
197
198        // Safety: 'avfilter_inout_alloc' is safe to call, and the returned pointer is
199        // valid
200        let mut inout = unsafe { SmartPtr::wrap_non_null(inout, destructor) }.ok_or(FfmpegError::Alloc)?;
201
202        let name = CString::new(name).map_err(|_| FfmpegError::Arguments("name must be non-empty"))?;
203
204        inout.as_deref_mut_except().name = name.into_raw();
205        inout.as_deref_mut_except().filter_ctx = context.0;
206        inout.as_deref_mut_except().pad_idx = pad;
207
208        if output {
209            inout.as_deref_mut_except().next = self.outputs.into_inner();
210            self.outputs = inout;
211        } else {
212            inout.as_deref_mut_except().next = self.inputs.into_inner();
213            self.inputs = inout;
214        }
215
216        Ok(self)
217    }
218}
219
220/// A filter. Thin wrapper around [`AVFilter`].
221#[derive(Clone, Copy, PartialEq, Eq)]
222pub struct Filter(*const AVFilter);
223
224impl Filter {
225    /// Get a filter by name.
226    pub fn get(name: &str) -> Option<Self> {
227        let name = std::ffi::CString::new(name).ok()?;
228
229        // Safety: avfilter_get_by_name is safe to call, and the returned pointer is
230        // valid
231        let filter = unsafe { avfilter_get_by_name(name.as_ptr()) };
232
233        if filter.is_null() {
234            None
235        } else {
236            Some(Self(filter))
237        }
238    }
239
240    /// Get the pointer to the filter.
241    pub const fn as_ptr(&self) -> *const AVFilter {
242        self.0
243    }
244
245    /// # Safety
246    /// `ptr` must be a valid pointer.
247    pub const unsafe fn wrap(ptr: *const AVFilter) -> Self {
248        Self(ptr)
249    }
250}
251
252/// Safety: `Filter` is safe to send between threads.
253unsafe impl Send for Filter {}
254
255/// A filter context. Thin wrapper around `AVFilterContext`.
256pub struct FilterContext<'a>(&'a mut AVFilterContext);
257
258/// Safety: `FilterContext` is safe to send between threads.
259unsafe impl Send for FilterContext<'_> {}
260
261impl<'a> FilterContext<'a> {
262    /// Returns a source for the filter context.
263    pub const fn source(self) -> FilterContextSource<'a> {
264        FilterContextSource(self.0)
265    }
266
267    /// Returns a sink for the filter context.
268    pub const fn sink(self) -> FilterContextSink<'a> {
269        FilterContextSink(self.0)
270    }
271}
272
273/// A source for a filter context. Where this is specifically used to send frames to the filter context.
274pub struct FilterContextSource<'a>(&'a mut AVFilterContext);
275
276/// Safety: `FilterContextSource` is safe to send between threads.
277unsafe impl Send for FilterContextSource<'_> {}
278
279impl FilterContextSource<'_> {
280    /// Sends a frame to the filter context.
281    pub fn send_frame(&mut self, frame: &GenericFrame) -> Result<(), FfmpegError> {
282        // Safety: `frame` is a valid pointer, and `self.0` is a valid pointer.
283        FfmpegErrorCode(unsafe { av_buffersrc_write_frame(self.0, frame.as_ptr()) }).result()?;
284        Ok(())
285    }
286
287    /// Sends an EOF frame to the filter context.
288    pub fn send_eof(&mut self, pts: Option<i64>) -> Result<(), FfmpegError> {
289        if let Some(pts) = pts {
290            // Safety: `av_buffersrc_close` is safe to call.
291            FfmpegErrorCode(unsafe { av_buffersrc_close(self.0, pts, 0) }).result()?;
292        } else {
293            // Safety: `av_buffersrc_write_frame` is safe to call.
294            FfmpegErrorCode(unsafe { av_buffersrc_write_frame(self.0, std::ptr::null()) }).result()?;
295        }
296
297        Ok(())
298    }
299}
300
301/// A sink for a filter context. Where this is specifically used to receive frames from the filter context.
302pub struct FilterContextSink<'a>(&'a mut AVFilterContext);
303
304/// Safety: `FilterContextSink` is safe to send between threads.
305unsafe impl Send for FilterContextSink<'_> {}
306
307impl FilterContextSink<'_> {
308    /// Receives a frame from the filter context.
309    pub fn receive_frame(&mut self) -> Result<Option<GenericFrame>, FfmpegError> {
310        let mut frame = GenericFrame::new()?;
311
312        // Safety: `frame` is a valid pointer, and `self.0` is a valid pointer.
313        match FfmpegErrorCode(unsafe { av_buffersink_get_frame(self.0, frame.as_mut_ptr()) }) {
314            code if code.is_success() => Ok(Some(frame)),
315            FfmpegErrorCode::Eagain | FfmpegErrorCode::Eof => Ok(None),
316            code => Err(FfmpegError::Code(code)),
317        }
318    }
319}
320
321#[cfg(test)]
322#[cfg_attr(all(test, coverage_nightly), coverage(off))]
323mod tests {
324    use std::ffi::CString;
325
326    use crate::ffi::avfilter_get_by_name;
327    use crate::filter_graph::{Filter, FilterGraph, FilterGraphParser};
328    use crate::frame::GenericFrame;
329    use crate::AVSampleFormat;
330
331    #[test]
332    fn test_filter_graph_new() {
333        let filter_graph = FilterGraph::new();
334        assert!(filter_graph.is_ok(), "FilterGraph::new should create a valid filter graph");
335
336        if let Ok(graph) = filter_graph {
337            assert!(!graph.as_ptr().is_null(), "FilterGraph pointer should not be null");
338        }
339    }
340
341    #[test]
342    fn test_filter_graph_as_mut_ptr() {
343        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
344        let raw_ptr = filter_graph.as_mut_ptr();
345
346        assert!(!raw_ptr.is_null(), "FilterGraph::as_mut_ptr should return a valid pointer");
347    }
348
349    #[test]
350    fn test_filter_graph_add() {
351        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
352        let filter_name = "buffer";
353        // Safety: `avfilter_get_by_name` is safe to call.
354        let filter_ptr = unsafe { avfilter_get_by_name(CString::new(filter_name).unwrap().as_ptr()) };
355        assert!(
356            !filter_ptr.is_null(),
357            "avfilter_get_by_name should return a valid pointer for filter '{}'",
358            filter_name
359        );
360
361        // Safety: The pointer here is valid.
362        let filter = unsafe { Filter::wrap(filter_ptr) };
363        let name = "buffer_filter";
364        let args = "width=1920:height=1080:pix_fmt=0:time_base=1/30";
365        let result = filter_graph.add(filter, name, args);
366
367        assert!(
368            result.is_ok(),
369            "FilterGraph::add should successfully add a filter to the graph"
370        );
371
372        if let Ok(context) = result {
373            assert!(
374                !context.0.filter.is_null(),
375                "The filter context should have a valid filter pointer"
376            );
377        }
378    }
379
380    #[test]
381    fn test_filter_graph_get() {
382        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
383        let filter_name = "buffer";
384        // Safety: `avfilter_get_by_name` is safe to call.
385        let filter_ptr = unsafe { avfilter_get_by_name(CString::new(filter_name).unwrap().as_ptr()) };
386        assert!(
387            !filter_ptr.is_null(),
388            "avfilter_get_by_name should return a valid pointer for filter '{}'",
389            filter_name
390        );
391
392        // Safety: The pointer here is valid.
393        let filter = unsafe { Filter::wrap(filter_ptr) };
394        let name = "buffer_filter";
395        let args = "width=1920:height=1080:pix_fmt=0:time_base=1/30";
396        filter_graph
397            .add(filter, name, args)
398            .expect("Failed to add filter to the graph");
399
400        let result = filter_graph.get(name);
401        assert!(
402            result.is_some(),
403            "FilterGraph::get should return Some(FilterContext) for an existing filter"
404        );
405
406        if let Some(filter_context) = result {
407            assert!(
408                !filter_context.0.filter.is_null(),
409                "The retrieved FilterContext should have a valid filter pointer"
410            );
411        }
412
413        let non_existent = filter_graph.get("non_existent_filter");
414        assert!(
415            non_existent.is_none(),
416            "FilterGraph::get should return None for a non-existent filter"
417        );
418    }
419
420    #[test]
421    fn test_filter_graph_validate_and_dump() {
422        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
423        let filter_spec = "anullsrc=sample_rate=44100:channel_layout=stereo [out0]; [out0] anullsink";
424        FilterGraphParser::new(&mut filter_graph)
425            .parse(filter_spec)
426            .expect("Failed to parse filter graph spec");
427
428        filter_graph.validate().expect("FilterGraph::validate should succeed");
429        let dump_output = filter_graph.dump().expect("Failed to dump the filter graph");
430
431        assert!(
432            dump_output.contains("anullsrc"),
433            "Dump output should include the 'anullsrc' filter type"
434        );
435        assert!(
436            dump_output.contains("anullsink"),
437            "Dump output should include the 'anullsink' filter type"
438        );
439    }
440
441    #[test]
442    fn test_filter_graph_set_thread_count() {
443        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
444        filter_graph.set_thread_count(4);
445        assert_eq!(
446            // Safety: The pointer here is valid.
447            unsafe { (*filter_graph.as_mut_ptr()).nb_threads },
448            4,
449            "Thread count should be set to 4"
450        );
451
452        filter_graph.set_thread_count(8);
453        assert_eq!(
454            // Safety: The pointer here is valid.
455            unsafe { (*filter_graph.as_mut_ptr()).nb_threads },
456            8,
457            "Thread count should be set to 8"
458        );
459    }
460
461    #[test]
462    fn test_filter_graph_input() {
463        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
464        let anullsrc = Filter::get("anullsrc").expect("Failed to get 'anullsrc' filter");
465        filter_graph
466            .add(anullsrc, "src", "sample_rate=44100:channel_layout=stereo")
467            .expect("Failed to add 'anullsrc' filter");
468        let input_parser = filter_graph
469            .input("src", 0)
470            .expect("Failed to set input for the filter graph");
471
472        assert!(
473            input_parser.graph.as_ptr() == filter_graph.as_ptr(),
474            "Input parser should belong to the same filter graph"
475        );
476    }
477
478    #[test]
479    fn test_filter_graph_output() {
480        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
481        let anullsink = Filter::get("anullsink").expect("Failed to get 'anullsink' filter");
482        filter_graph
483            .add(anullsink, "sink", "")
484            .expect("Failed to add 'anullsink' filter");
485        let output_parser = filter_graph
486            .output("sink", 0)
487            .expect("Failed to set output for the filter graph");
488
489        assert!(
490            output_parser.graph.as_ptr() == filter_graph.as_ptr(),
491            "Output parser should belong to the same filter graph"
492        );
493    }
494
495    #[test]
496    fn test_filter_context_source() {
497        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
498        let anullsrc = Filter::get("anullsrc").expect("Failed to get 'anullsrc' filter");
499        filter_graph
500            .add(anullsrc, "src", "sample_rate=44100:channel_layout=stereo")
501            .expect("Failed to add 'anullsrc' filter");
502        let filter_context = filter_graph.get("src").expect("Failed to retrieve 'src' filter context");
503        let source_context = filter_context.source();
504
505        assert!(
506            std::ptr::eq(source_context.0, filter_graph.get("src").unwrap().0),
507            "Source context should wrap the same filter as the original filter context"
508        );
509    }
510
511    #[test]
512    fn test_filter_context_sink() {
513        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
514        let anullsink = Filter::get("anullsink").expect("Failed to get 'anullsink' filter");
515        filter_graph
516            .add(anullsink, "sink", "")
517            .expect("Failed to add 'anullsink' filter");
518        let filter_context = filter_graph.get("sink").expect("Failed to retrieve 'sink' filter context");
519        let sink_context = filter_context.sink();
520
521        assert!(
522            std::ptr::eq(sink_context.0, filter_graph.get("sink").unwrap().0),
523            "Sink context should wrap the same filter as the original filter context"
524        );
525    }
526
527    #[test]
528    fn test_filter_context_source_send_and_receive_frame() {
529        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
530        let filter_spec = "\
531            abuffer=sample_rate=44100:sample_fmt=s16:channel_layout=stereo:time_base=1/44100 \
532            [out]; \
533            [out] abuffersink";
534        FilterGraphParser::new(&mut filter_graph)
535            .parse(filter_spec)
536            .expect("Failed to parse filter graph spec");
537        filter_graph.validate().expect("Failed to validate filter graph");
538
539        let source_context_name = "Parsed_abuffer_0";
540        let sink_context_name = "Parsed_abuffersink_1";
541
542        let mut frame = GenericFrame::new().expect("Failed to create frame");
543        frame.set_format(AVSampleFormat::S16.into());
544        let mut audio_frame = frame.audio();
545        audio_frame.set_nb_samples(1024);
546        audio_frame.set_sample_rate(44100);
547
548        assert!(
549            audio_frame.set_channel_layout_default(2).is_ok(),
550            "Failed to set default channel layout"
551        );
552        assert!(
553            // Safety: `audio_frame` is a valid pointer. And we dont attempt to read from the frame until after the allocation.
554            unsafe { audio_frame.alloc_frame_buffer(None).is_ok() },
555            "Failed to allocate frame buffer"
556        );
557
558        let mut source_context = filter_graph
559            .get(source_context_name)
560            .expect("Failed to retrieve source filter context")
561            .source();
562
563        let result = source_context.send_frame(&audio_frame);
564        assert!(result.is_ok(), "send_frame should succeed when sending a valid frame");
565
566        let mut sink_context = filter_graph
567            .get(sink_context_name)
568            .expect("Failed to retrieve sink filter context")
569            .sink();
570        let received_frame = sink_context
571            .receive_frame()
572            .expect("Failed to receive frame from sink context");
573
574        assert!(received_frame.is_some(), "No frame received from sink context");
575
576        insta::assert_debug_snapshot!(received_frame.unwrap(), @r"
577        GenericFrame {
578            pts: None,
579            dts: None,
580            duration: Some(
581                1024,
582            ),
583            best_effort_timestamp: None,
584            time_base: Rational {
585                numerator: 0,
586                denominator: 1,
587            },
588            format: 1,
589            is_audio: true,
590            is_video: false,
591        }
592        ");
593    }
594
595    #[test]
596    fn test_filter_context_source_send_frame_error() {
597        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
598        let filter_spec = "\
599            abuffer=sample_rate=44100:sample_fmt=s16:channel_layout=stereo:time_base=1/44100 \
600            [out]; \
601            [out] anullsink";
602        FilterGraphParser::new(&mut filter_graph)
603            .parse(filter_spec)
604            .expect("Failed to parse filter graph spec");
605        filter_graph.validate().expect("Failed to validate filter graph");
606
607        let mut source_context = filter_graph
608            .get("Parsed_abuffer_0")
609            .expect("Failed to retrieve 'Parsed_abuffer_0' filter context")
610            .source();
611
612        // create frame w/ mismatched format and sample rate
613        let mut frame = GenericFrame::new().expect("Failed to create frame");
614        frame.set_format(AVSampleFormat::Fltp.into());
615        let result = source_context.send_frame(&frame);
616
617        assert!(result.is_err(), "send_frame should fail when sending an invalid frame");
618    }
619
620    #[test]
621    fn test_filter_context_source_send_and_receive_eof() {
622        let mut filter_graph = FilterGraph::new().expect("Failed to create filter graph");
623        let filter_spec = "\
624            abuffer=sample_rate=44100:sample_fmt=s16:channel_layout=stereo:time_base=1/44100 \
625            [out]; \
626            [out] abuffersink";
627        FilterGraphParser::new(&mut filter_graph)
628            .parse(filter_spec)
629            .expect("Failed to parse filter graph spec");
630        filter_graph.validate().expect("Failed to validate filter graph");
631
632        let source_context_name = "Parsed_abuffer_0";
633        let sink_context_name = "Parsed_abuffersink_1";
634
635        {
636            let mut source_context = filter_graph
637                .get(source_context_name)
638                .expect("Failed to retrieve source filter context")
639                .source();
640            let eof_result_with_pts = source_context.send_eof(Some(12345));
641            assert!(eof_result_with_pts.is_ok(), "send_eof with PTS should succeed");
642
643            let eof_result_without_pts = source_context.send_eof(None);
644            assert!(eof_result_without_pts.is_ok(), "send_eof without PTS should succeed");
645        }
646
647        {
648            let mut sink_context = filter_graph
649                .get(sink_context_name)
650                .expect("Failed to retrieve sink filter context")
651                .sink();
652            let received_frame = sink_context.receive_frame();
653            assert!(received_frame.is_ok(), "receive_frame should succeed after EOF is sent");
654            assert!(received_frame.unwrap().is_none(), "No frame should be received after EOF");
655        }
656    }
657}