main: changes, fixes

- refactors http session usage across modules
- adjusts video info extraction
- improves temporary directory, concurrency limits, buffering, and error handling in download routines
This commit is contained in:
stefanodvx 2025-04-14 23:45:54 +02:00
parent 10c113f400
commit 58bd5827b3
12 changed files with 171 additions and 51 deletions

View file

@ -12,10 +12,7 @@ func ExtractVideoThumbnail(
Input(videoPath).
Output(thumbnailPath, ffmpeg.KwArgs{
"vframes": 1,
"f": "image2",
"ss": "00:00:01",
"c:v": "mjpeg",
"q:v": 10, // not sure
}).
Silent(true).
OverWriteOutput().

View file

@ -10,9 +10,9 @@ func GetVideoInfo(filePath string) (int64, int64, int64) {
if err != nil {
return 0, 0, 0
}
duration := gjson.Get(probeData, "format.duration").Int()
duration := gjson.Get(probeData, "format.duration").Float()
width := gjson.Get(probeData, "streams.0.width").Int()
height := gjson.Get(probeData, "streams.0.height").Int()
return duration, width, height
return int64(duration), width, height
}

View file

@ -1,19 +1,24 @@
package util
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"log"
"math"
"net/http"
"os"
"path/filepath"
"runtime"
"sync"
"time"
"govd/models"
"govd/util/av"
"github.com/google/uuid"
)
func DefaultConfig() *models.DownloadConfig {
@ -81,10 +86,19 @@ func DownloadFileWithSegments(
if err := EnsureDownloadDir(config.DownloadDir); err != nil {
return "", err
}
tempDir := filepath.Join(config.DownloadDir, "segments_"+time.Now().Format("20060102_150405"))
tempDir := filepath.Join(
config.DownloadDir,
"segments"+uuid.NewString(),
)
if err := os.MkdirAll(tempDir, 0755); err != nil {
return "", fmt.Errorf("failed to create temporary directory: %w", err)
}
var cleanupErr error
defer func() {
if cleanupErr = os.RemoveAll(tempDir); cleanupErr != nil {
log.Printf("warning: failed to clean up temp directory %s: %v\n", tempDir, cleanupErr)
}
}()
downloadedFiles, err := DownloadSegments(ctx, segmentURLs, config)
if err != nil {
os.RemoveAll(tempDir)
@ -128,17 +142,27 @@ func DownloadFileInMemory(
return nil, fmt.Errorf("%w: %v", ErrDownloadFailed, errs)
}
func downloadInMemory(ctx context.Context, fileURL string, timeout time.Duration) ([]byte, error) {
func downloadInMemory(
ctx context.Context,
fileURL string,
timeout time.Duration,
) ([]byte, error) {
reqCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue with the request
}
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fileURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
session := GetHTTPSession()
resp, err := session.Do(req)
resp, err := httpSession.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to download file: %w", err)
}
@ -148,7 +172,17 @@ func downloadInMemory(ctx context.Context, fileURL string, timeout time.Duration
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return io.ReadAll(resp.Body)
var buf bytes.Buffer
if resp.ContentLength > 0 {
buf.Grow(int(resp.ContentLength))
}
_, err = io.Copy(&buf, resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
return buf.Bytes(), nil
}
func EnsureDownloadDir(dir string) error {
@ -170,6 +204,12 @@ func runChunkedDownload(
filePath string,
config *models.DownloadConfig,
) error {
// reduce concurrency if it's greater
// than the number of available CPUs
if runtime.NumCPU() < config.Concurrency && runtime.GOMAXPROCS(0) < config.Concurrency {
config.Concurrency = runtime.NumCPU()
}
fileSize, err := getFileSize(ctx, fileURL, config.Timeout)
if err != nil {
return err
@ -193,7 +233,7 @@ func runChunkedDownload(
semaphore := make(chan struct{}, config.Concurrency)
var wg sync.WaitGroup
errChan := make(chan error, 1)
errChan := make(chan error, len(chunks))
var downloadErr error
var errOnce sync.Once
@ -252,22 +292,40 @@ func runChunkedDownload(
}(idx, chunk)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(errChan)
close(done)
}()
var multiErr []error
select {
case err := <-errChan:
if err != nil {
// clean up partial download
os.Remove(filePath)
return err
multiErr = append(multiErr, err)
// collect all errors
for e := range errChan {
if e != nil {
multiErr = append(multiErr, e)
}
}
}
<-done
case <-ctx.Done():
cancelDownload()
<-done // wait for all goroutines to finish
os.Remove(filePath)
return ctx.Err()
case <-done:
// no errors
}
if len(multiErr) > 0 {
os.Remove(filePath)
return fmt.Errorf("multiple download errors: %v", multiErr)
}
return nil
@ -282,8 +340,7 @@ func getFileSize(ctx context.Context, fileURL string, timeout time.Duration) (in
return 0, fmt.Errorf("failed to create request: %w", err)
}
session := GetHTTPSession()
resp, err := session.Do(req)
resp, err := httpSession.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to get file size: %w", err)
}
@ -340,8 +397,7 @@ func downloadChunk(
}
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", chunk[0], chunk[1]))
session := GetHTTPSession()
resp, err := session.Do(req)
resp, err := httpSession.Do(req)
if err != nil {
return nil, fmt.Errorf("download failed: %w", err)
}
@ -351,7 +407,18 @@ func downloadChunk(
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return io.ReadAll(resp.Body)
var buf bytes.Buffer
if resp.ContentLength > 0 {
buf.Grow(int(resp.ContentLength))
} else {
buf.Grow(chunk[1] - chunk[0] + 1)
}
_, err = io.Copy(&buf, resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read chunk data: %w", err)
}
return buf.Bytes(), nil
}
func writeChunkToFile(file *os.File, data []byte, offset int) error {
@ -388,7 +455,10 @@ func DownloadSegments(
config = DefaultConfig()
}
tempDir := filepath.Join(config.DownloadDir, "segments_"+time.Now().Format("20060102_150405"))
tempDir := filepath.Join(
config.DownloadDir,
"segments"+uuid.NewString(),
)
if err := os.MkdirAll(tempDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create temporary directory: %w", err)
}
@ -397,14 +467,26 @@ func DownloadSegments(
var wg sync.WaitGroup
errChan := make(chan error, len(segmentURLs))
var errMutex sync.Mutex
var firstErr error
downloadedFiles := make([]string, len(segmentURLs))
downloadCtx, cancelDownload := context.WithCancel(ctx)
defer cancelDownload()
for i, segmentURL := range segmentURLs {
wg.Add(1)
go func(idx int, url string) {
defer wg.Done()
select {
case <-downloadCtx.Done():
return
default:
// continue with the download
}
// acquire semaphore slot
semaphore <- struct{}{}
defer func() { <-semaphore }()
@ -424,7 +506,12 @@ func DownloadSegments(
})
if err != nil {
errChan <- fmt.Errorf("failed to download segment %d: %w", idx, err)
errMutex.Lock()
if firstErr == nil {
firstErr = fmt.Errorf("failed to download segment %d: %w", idx, err)
cancelDownload() // Cancella tutte le altre download
}
errMutex.Unlock()
return
}
@ -466,7 +553,17 @@ func MergeSegmentFiles(
if err != nil {
return "", fmt.Errorf("failed to create output file: %w", err)
}
defer outputFile.Close()
defer func() {
outputFile.Close()
if err != nil {
os.Remove(outputPath)
}
}()
bufferedWriter := bufio.NewWriterSize(
outputFile,
1024*1024,
) // 1MB buffer
var totalBytes int64
var processedBytes int64
@ -483,6 +580,9 @@ func MergeSegmentFiles(
for i, segmentPath := range segmentPaths {
select {
case <-ctx.Done():
bufferedWriter.Flush()
outputFile.Close()
os.Remove(outputPath)
return "", ctx.Err()
default:
segmentFile, err := os.Open(segmentPath)
@ -490,7 +590,7 @@ func MergeSegmentFiles(
return "", fmt.Errorf("failed to open segment %d: %w", i, err)
}
written, err := io.Copy(outputFile, segmentFile)
written, err := io.Copy(bufferedWriter, segmentFile)
segmentFile.Close()
if err != nil {
@ -505,6 +605,10 @@ func MergeSegmentFiles(
}
}
if err := bufferedWriter.Flush(); err != nil {
return "", fmt.Errorf("failed to flush data: %w", err)
}
if config.Remux {
err := av.RemuxFile(outputPath)
if err != nil {

View file

@ -1,22 +1,38 @@
package util
import (
"net"
"net/http"
"sync"
"time"
)
var httpSession = NewHTTPSession()
func NewHTTPSession() *http.Client {
session := &http.Client{
Timeout: 20 * time.Second,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
}
return session
}
var (
httpSession *http.Client
httpSessionOnce sync.Once
)
func GetHTTPSession() *http.Client {
httpSessionOnce.Do(func() {
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConnsPerHost: 10,
MaxConnsPerHost: 10,
}
httpSession = &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}
})
return httpSession
}