// Media scrubber sidecar — tiny HTTP service that re-encodes video/audio // through ffmpeg with all metadata stripped. Runs alongside the DChain // node in docker-compose; the node calls it via DCHAIN_MEDIA_SIDECAR_URL. // // Contract (matches media.Scrubber in the node): // // POST /scrub/video Content-Type: video/* body: raw bytes // → 200, Content-Type: video/mp4, body: cleaned bytes // POST /scrub/audio Content-Type: audio/* body: raw bytes // → 200, Content-Type: audio/ogg, body: cleaned bytes // // ffmpeg flags of note: // // -map_metadata -1 drop ALL metadata streams (title, author, encoder, // GPS location atoms, XMP blocks, etc.) // -map 0:v -map 0:a keep only video and audio streams — dumps attached // pictures, subtitles, data channels that might carry // hidden info // -movflags +faststart // put MOOV atom at the front so clients can start // playback before the full download lands // -c:v libx264 -crf 28 -preset fast // h264 with aggressive-but-not-painful CRF; ~70-80% // size reduction on phone-camera source // -c:a libopus -b:a 64k // opus at 64 kbps is transparent for speech, fine // for music at feed quality // // Environment: // // LISTEN_ADDR default ":8090" // FFMPEG_BIN default "ffmpeg" (must be in PATH) // MAX_INPUT_MB default 32 — reject anything larger pre-ffmpeg // JOB_TIMEOUT_SECS default 60 // // The service is deliberately dumb: no queuing, no DB, no state. If you // need higher throughput, run N replicas behind a TCP load balancer. package main import ( "bytes" "context" "fmt" "io" "log" "net/http" "os" "os/exec" "strconv" "time" ) func main() { addr := envOr("LISTEN_ADDR", ":8090") ffmpegBin := envOr("FFMPEG_BIN", "ffmpeg") maxInputMB := envInt("MAX_INPUT_MB", 32) jobTimeoutSecs := envInt("JOB_TIMEOUT_SECS", 60) // Fail fast if ffmpeg is missing — easier to debug at container start // than to surface cryptic errors per-request. if _, err := exec.LookPath(ffmpegBin); err != nil { log.Fatalf("ffmpeg not found in PATH (looked for %q): %v", ffmpegBin, err) } srv := &server{ ffmpegBin: ffmpegBin, maxInputSize: int64(maxInputMB) * 1024 * 1024, jobTimeout: time.Duration(jobTimeoutSecs) * time.Second, } mux := http.NewServeMux() mux.HandleFunc("/scrub/video", srv.scrubVideo) mux.HandleFunc("/scrub/audio", srv.scrubAudio) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte("ok")) }) log.Printf("media-sidecar: listening on %s, ffmpeg=%s, max_input=%d MiB, timeout=%ds", addr, ffmpegBin, maxInputMB, jobTimeoutSecs) if err := http.ListenAndServe(addr, mux); err != nil { log.Fatalf("ListenAndServe: %v", err) } } type server struct { ffmpegBin string maxInputSize int64 jobTimeout time.Duration } func (s *server) scrubVideo(w http.ResponseWriter, r *http.Request) { body, err := s.readLimited(r) if err != nil { httpErr(w, err.Error(), http.StatusBadRequest) return } ctx, cancel := context.WithTimeout(r.Context(), s.jobTimeout) defer cancel() // Video path: re-encode with metadata strip, H.264 CRF 28, opus audio. // Output format is MP4 (widest client compatibility). args := []string{ "-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-map", "0:v", "-map", "0:a?", "-map_metadata", "-1", "-c:v", "libx264", "-preset", "fast", "-crf", "28", "-c:a", "libopus", "-b:a", "64k", "-movflags", "+faststart+frag_keyframe", "-f", "mp4", "pipe:1", } out, ffErr, err := s.runFFmpeg(ctx, args, body) if err != nil { log.Printf("video scrub failed: %v | stderr=%s", err, ffErr) httpErr(w, "ffmpeg failed: "+err.Error(), http.StatusUnprocessableEntity) return } w.Header().Set("Content-Type", "video/mp4") w.Header().Set("Content-Length", strconv.Itoa(len(out))) _, _ = w.Write(out) } func (s *server) scrubAudio(w http.ResponseWriter, r *http.Request) { body, err := s.readLimited(r) if err != nil { httpErr(w, err.Error(), http.StatusBadRequest) return } ctx, cancel := context.WithTimeout(r.Context(), s.jobTimeout) defer cancel() args := []string{ "-hide_banner", "-loglevel", "error", "-i", "pipe:0", "-vn", "-map", "0:a", "-map_metadata", "-1", "-c:a", "libopus", "-b:a", "64k", "-f", "ogg", "pipe:1", } out, ffErr, err := s.runFFmpeg(ctx, args, body) if err != nil { log.Printf("audio scrub failed: %v | stderr=%s", err, ffErr) httpErr(w, "ffmpeg failed: "+err.Error(), http.StatusUnprocessableEntity) return } w.Header().Set("Content-Type", "audio/ogg") w.Header().Set("Content-Length", strconv.Itoa(len(out))) _, _ = w.Write(out) } func (s *server) runFFmpeg(ctx context.Context, args []string, input []byte) ([]byte, string, error) { cmd := exec.CommandContext(ctx, s.ffmpegBin, args...) cmd.Stdin = bytes.NewReader(input) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr err := cmd.Run() if err != nil { return nil, stderr.String(), err } return stdout.Bytes(), stderr.String(), nil } func (s *server) readLimited(r *http.Request) ([]byte, error) { if r.Method != http.MethodPost { return nil, fmt.Errorf("method not allowed") } limited := io.LimitReader(r.Body, s.maxInputSize+1) buf, err := io.ReadAll(limited) if err != nil { return nil, fmt.Errorf("read body: %w", err) } if int64(len(buf)) > s.maxInputSize { return nil, fmt.Errorf("input exceeds %d bytes", s.maxInputSize) } return buf, nil } func httpErr(w http.ResponseWriter, msg string, status int) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(status) _, _ = w.Write([]byte(msg)) } func envOr(k, d string) string { if v := os.Getenv(k); v != "" { return v } return d } func envInt(k string, d int) int { v := os.Getenv(k) if v == "" { return d } n, err := strconv.Atoi(v) if err != nil { return d } return n }