diff --git a/bot/core/util.go b/bot/core/util.go index f2b6790..77ad514 100644 --- a/bot/core/util.go +++ b/bot/core/util.go @@ -57,10 +57,10 @@ func insertVideoInfo( format *models.MediaFormat, filePath string, ) { - width, height, duration := av.GetVideoInfo(filePath) + duration, width, height := av.GetVideoInfo(filePath) + format.Duration = duration format.Width = width format.Height = height - format.Duration = duration } func GetMessageFileID(msg *gotgbot.Message) string { diff --git a/database/main.go b/database/main.go index dab7caa..dfd0792 100644 --- a/database/main.go +++ b/database/main.go @@ -41,6 +41,9 @@ func Start() { if err != nil { log.Fatalf("failed to get database connection: %v", err) } + sqlDB.SetMaxIdleConns(10) + sqlDB.SetMaxOpenConns(100) + sqlDB.SetConnMaxLifetime(time.Hour) err = sqlDB.Ping() if err != nil { log.Fatalf("failed to ping database: %v", err) diff --git a/ext/instagram/main.go b/ext/instagram/main.go index 187d848..6914c6d 100644 --- a/ext/instagram/main.go +++ b/ext/instagram/main.go @@ -16,6 +16,8 @@ import ( // feel free to open PR, if you want to // add support for the official Instagram API +var httpSession = util.GetHTTPSession() + const ( apiHostname = "api.igram.world" apiKey = "aaeaf2805cea6abef3f9d2b6a666fce62fd9d612a43ab772bb50ce81455112e0" @@ -42,8 +44,6 @@ var igHeaders = map[string]string{ "User-Agent": util.ChromeUA, } -var HTTPSession = util.NewHTTPSession() - var Extractor = &models.Extractor{ Name: "Instagram", CodeName: "instagram", @@ -96,7 +96,7 @@ var ShareURLExtractor = &models.Extractor{ for k, v := range igHeaders { req.Header.Set(k, v) } - resp, err := HTTPSession.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } @@ -183,7 +183,7 @@ func GetVideoAPI(contentURL string) (*IGramResponse, error) { req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", util.ChromeUA) - resp, err := HTTPSession.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } diff --git a/ext/instagram/util.go b/ext/instagram/util.go index 4e9d4eb..ed6c7b9 100644 --- a/ext/instagram/util.go +++ b/ext/instagram/util.go @@ -115,7 +115,7 @@ func GetPostCaption( req.Header.Set("Cache-Control", "no-cache") req.Header.Set("TE", "trailers") - resp, err := HTTPSession.Do(req) + resp, err := httpSession.Do(req) if err != nil { return "", fmt.Errorf("failed to send request: %w", err) } diff --git a/ext/pinterest/main.go b/ext/pinterest/main.go index f639836..b0f6974 100644 --- a/ext/pinterest/main.go +++ b/ext/pinterest/main.go @@ -17,7 +17,7 @@ const ( shortenerAPIFormat = "https://api.pinterest.com/url_shortener/%s/redirect/" ) -var HTTPSession = util.NewHTTPSession() +var httpSession = util.GetHTTPSession() var ShortExtractor = &models.Extractor{ Name: "Pinterest (Short)", @@ -148,7 +148,7 @@ func GetPinData(pinID string) (*PinData, error) { // fix 403 error req.Header.Set("X-Pinterest-PWS-Handler", "www/[username].js") - resp, err := HTTPSession.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } diff --git a/ext/reddit/main.go b/ext/reddit/main.go index 8335641..1bf7f4e 100644 --- a/ext/reddit/main.go +++ b/ext/reddit/main.go @@ -12,7 +12,7 @@ import ( "govd/util" ) -var HTTPSession = util.NewHTTPSession() +var httpSession = util.GetHTTPSession() var ShortExtractor = &models.Extractor{ Name: "Reddit (Short)", @@ -37,7 +37,7 @@ var ShortExtractor = &models.Extractor{ req.AddCookie(cookie) } - res, err := HTTPSession.Do(req) + res, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } @@ -229,7 +229,7 @@ func GetRedditData(host string, slug string) (RedditResponse, error) { req.AddCookie(cookie) } - res, err := HTTPSession.Do(req) + res, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } diff --git a/ext/tiktok/main.go b/ext/tiktok/main.go index fe58b89..caa3752 100644 --- a/ext/tiktok/main.go +++ b/ext/tiktok/main.go @@ -23,7 +23,7 @@ const ( appUserAgent = packageID + " (Linux; U; Android 13; en_US; Pixel 7; Build/TD1A.220804.031; Cronet/58.0.2991.0)" ) -var HTTPSession = util.NewHTTPSession() +var httpSession = util.GetHTTPSession() var VMExtractor = &models.Extractor{ Name: "TikTok VM", @@ -147,7 +147,7 @@ func GetVideoAPI(awemeID string) (*AwemeDetails, error) { req.Header.Set("Accept", "application/json") req.Header.Set("X-Argus", "") - resp, err := HTTPSession.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } diff --git a/ext/twitter/main.go b/ext/twitter/main.go index a79877e..9aefb4b 100644 --- a/ext/twitter/main.go +++ b/ext/twitter/main.go @@ -17,7 +17,7 @@ const ( apiEndpoint = "https://x.com/i/api/graphql/zZXycP0V6H7m-2r0mOnFcA/TweetDetail" ) -var HTTPSession = util.NewHTTPSession() +var httpSession = util.GetHTTPSession() var ShortExtractor = &models.Extractor{ Name: "Twitter (Short)", @@ -33,7 +33,7 @@ var ShortExtractor = &models.Extractor{ return nil, fmt.Errorf("failed to create req: %w", err) } req.Header.Set("User-Agent", util.ChromeUA) - res, err := HTTPSession.Do(req) + res, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } @@ -151,7 +151,7 @@ func GetTweetAPI(tweetID string) (*Tweet, error) { } req.URL.RawQuery = q.Encode() - resp, err := HTTPSession.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } diff --git a/util/av/thumbnail.go b/util/av/thumbnail.go index 555f2be..741cff1 100644 --- a/util/av/thumbnail.go +++ b/util/av/thumbnail.go @@ -12,10 +12,7 @@ func ExtractVideoThumbnail( Input(videoPath). Output(thumbnailPath, ffmpeg.KwArgs{ "vframes": 1, - "f": "image2", "ss": "00:00:01", - "c:v": "mjpeg", - "q:v": 10, // not sure }). Silent(true). OverWriteOutput(). diff --git a/util/av/videoinfo.go b/util/av/videoinfo.go index 638d6e6..d96240c 100644 --- a/util/av/videoinfo.go +++ b/util/av/videoinfo.go @@ -10,9 +10,9 @@ func GetVideoInfo(filePath string) (int64, int64, int64) { if err != nil { return 0, 0, 0 } - duration := gjson.Get(probeData, "format.duration").Int() + duration := gjson.Get(probeData, "format.duration").Float() width := gjson.Get(probeData, "streams.0.width").Int() height := gjson.Get(probeData, "streams.0.height").Int() - return duration, width, height + return int64(duration), width, height } diff --git a/util/download.go b/util/download.go index 0e9135f..44af6e5 100644 --- a/util/download.go +++ b/util/download.go @@ -1,19 +1,24 @@ package util import ( + "bufio" "bytes" "context" "fmt" "io" + "log" "math" "net/http" "os" "path/filepath" + "runtime" "sync" "time" "govd/models" "govd/util/av" + + "github.com/google/uuid" ) func DefaultConfig() *models.DownloadConfig { @@ -81,10 +86,19 @@ func DownloadFileWithSegments( if err := EnsureDownloadDir(config.DownloadDir); err != nil { return "", err } - tempDir := filepath.Join(config.DownloadDir, "segments_"+time.Now().Format("20060102_150405")) + tempDir := filepath.Join( + config.DownloadDir, + "segments"+uuid.NewString(), + ) if err := os.MkdirAll(tempDir, 0755); err != nil { return "", fmt.Errorf("failed to create temporary directory: %w", err) } + var cleanupErr error + defer func() { + if cleanupErr = os.RemoveAll(tempDir); cleanupErr != nil { + log.Printf("warning: failed to clean up temp directory %s: %v\n", tempDir, cleanupErr) + } + }() downloadedFiles, err := DownloadSegments(ctx, segmentURLs, config) if err != nil { os.RemoveAll(tempDir) @@ -128,17 +142,27 @@ func DownloadFileInMemory( return nil, fmt.Errorf("%w: %v", ErrDownloadFailed, errs) } -func downloadInMemory(ctx context.Context, fileURL string, timeout time.Duration) ([]byte, error) { +func downloadInMemory( + ctx context.Context, + fileURL string, + timeout time.Duration, +) ([]byte, error) { reqCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue with the request + } + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fileURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } - session := GetHTTPSession() - resp, err := session.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("failed to download file: %w", err) } @@ -148,7 +172,17 @@ func downloadInMemory(ctx context.Context, fileURL string, timeout time.Duration return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - return io.ReadAll(resp.Body) + var buf bytes.Buffer + if resp.ContentLength > 0 { + buf.Grow(int(resp.ContentLength)) + } + + _, err = io.Copy(&buf, resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + return buf.Bytes(), nil } func EnsureDownloadDir(dir string) error { @@ -170,6 +204,12 @@ func runChunkedDownload( filePath string, config *models.DownloadConfig, ) error { + // reduce concurrency if it's greater + // than the number of available CPUs + if runtime.NumCPU() < config.Concurrency && runtime.GOMAXPROCS(0) < config.Concurrency { + config.Concurrency = runtime.NumCPU() + } + fileSize, err := getFileSize(ctx, fileURL, config.Timeout) if err != nil { return err @@ -193,7 +233,7 @@ func runChunkedDownload( semaphore := make(chan struct{}, config.Concurrency) var wg sync.WaitGroup - errChan := make(chan error, 1) + errChan := make(chan error, len(chunks)) var downloadErr error var errOnce sync.Once @@ -252,22 +292,40 @@ func runChunkedDownload( }(idx, chunk) } + done := make(chan struct{}) + go func() { wg.Wait() close(errChan) + close(done) }() + var multiErr []error + select { case err := <-errChan: if err != nil { - // clean up partial download - os.Remove(filePath) - return err + multiErr = append(multiErr, err) + // collect all errors + for e := range errChan { + if e != nil { + multiErr = append(multiErr, e) + } + } } + <-done case <-ctx.Done(): cancelDownload() + <-done // wait for all goroutines to finish os.Remove(filePath) return ctx.Err() + case <-done: + // no errors + } + + if len(multiErr) > 0 { + os.Remove(filePath) + return fmt.Errorf("multiple download errors: %v", multiErr) } return nil @@ -282,8 +340,7 @@ func getFileSize(ctx context.Context, fileURL string, timeout time.Duration) (in return 0, fmt.Errorf("failed to create request: %w", err) } - session := GetHTTPSession() - resp, err := session.Do(req) + resp, err := httpSession.Do(req) if err != nil { return 0, fmt.Errorf("failed to get file size: %w", err) } @@ -340,8 +397,7 @@ func downloadChunk( } req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", chunk[0], chunk[1])) - session := GetHTTPSession() - resp, err := session.Do(req) + resp, err := httpSession.Do(req) if err != nil { return nil, fmt.Errorf("download failed: %w", err) } @@ -351,7 +407,18 @@ func downloadChunk( return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - return io.ReadAll(resp.Body) + var buf bytes.Buffer + if resp.ContentLength > 0 { + buf.Grow(int(resp.ContentLength)) + } else { + buf.Grow(chunk[1] - chunk[0] + 1) + } + _, err = io.Copy(&buf, resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read chunk data: %w", err) + } + + return buf.Bytes(), nil } func writeChunkToFile(file *os.File, data []byte, offset int) error { @@ -388,7 +455,10 @@ func DownloadSegments( config = DefaultConfig() } - tempDir := filepath.Join(config.DownloadDir, "segments_"+time.Now().Format("20060102_150405")) + tempDir := filepath.Join( + config.DownloadDir, + "segments"+uuid.NewString(), + ) if err := os.MkdirAll(tempDir, 0755); err != nil { return nil, fmt.Errorf("failed to create temporary directory: %w", err) } @@ -397,14 +467,26 @@ func DownloadSegments( var wg sync.WaitGroup errChan := make(chan error, len(segmentURLs)) + var errMutex sync.Mutex + var firstErr error downloadedFiles := make([]string, len(segmentURLs)) + downloadCtx, cancelDownload := context.WithCancel(ctx) + defer cancelDownload() + for i, segmentURL := range segmentURLs { wg.Add(1) go func(idx int, url string) { defer wg.Done() + select { + case <-downloadCtx.Done(): + return + default: + // continue with the download + } + // acquire semaphore slot semaphore <- struct{}{} defer func() { <-semaphore }() @@ -424,7 +506,12 @@ func DownloadSegments( }) if err != nil { - errChan <- fmt.Errorf("failed to download segment %d: %w", idx, err) + errMutex.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("failed to download segment %d: %w", idx, err) + cancelDownload() // Cancella tutte le altre download + } + errMutex.Unlock() return } @@ -466,7 +553,17 @@ func MergeSegmentFiles( if err != nil { return "", fmt.Errorf("failed to create output file: %w", err) } - defer outputFile.Close() + defer func() { + outputFile.Close() + if err != nil { + os.Remove(outputPath) + } + }() + + bufferedWriter := bufio.NewWriterSize( + outputFile, + 1024*1024, + ) // 1MB buffer var totalBytes int64 var processedBytes int64 @@ -483,6 +580,9 @@ func MergeSegmentFiles( for i, segmentPath := range segmentPaths { select { case <-ctx.Done(): + bufferedWriter.Flush() + outputFile.Close() + os.Remove(outputPath) return "", ctx.Err() default: segmentFile, err := os.Open(segmentPath) @@ -490,7 +590,7 @@ func MergeSegmentFiles( return "", fmt.Errorf("failed to open segment %d: %w", i, err) } - written, err := io.Copy(outputFile, segmentFile) + written, err := io.Copy(bufferedWriter, segmentFile) segmentFile.Close() if err != nil { @@ -505,6 +605,10 @@ func MergeSegmentFiles( } } + if err := bufferedWriter.Flush(); err != nil { + return "", fmt.Errorf("failed to flush data: %w", err) + } + if config.Remux { err := av.RemuxFile(outputPath) if err != nil { diff --git a/util/http.go b/util/http.go index 7891587..8c21c8c 100644 --- a/util/http.go +++ b/util/http.go @@ -1,22 +1,38 @@ package util import ( + "net" "net/http" + "sync" "time" ) -var httpSession = NewHTTPSession() - -func NewHTTPSession() *http.Client { - session := &http.Client{ - Timeout: 20 * time.Second, - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - }, - } - return session -} +var ( + httpSession *http.Client + httpSessionOnce sync.Once +) func GetHTTPSession() *http.Client { + httpSessionOnce.Do(func() { + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + MaxIdleConnsPerHost: 10, + MaxConnsPerHost: 10, + } + + httpSession = &http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } + }) return httpSession }