diff --git a/util/download.go b/util/download.go index aa9e620..a5bce34 100644 --- a/util/download.go +++ b/util/download.go @@ -102,14 +102,12 @@ func DownloadFileWithSegments( return "", fmt.Errorf("failed to create temporary directory: %w", err) } - defer os.RemoveAll(tempDir) - - downloadedFiles, err := downloadSegments(ctx, segmentURLs, config) + downloadedFiles, err := downloadSegments(ctx, tempDir, segmentURLs, config) if err != nil { os.RemoveAll(tempDir) return "", fmt.Errorf("failed to download segments: %w", err) } - mergedFilePath, err := MergeSegmentFiles(ctx, downloadedFiles, fileName, config) + mergedFilePath, err := mergeSegmentFiles(ctx, downloadedFiles, fileName, config) if err != nil { os.RemoveAll(tempDir) return "", fmt.Errorf("failed to merge segments: %w", err) @@ -406,6 +404,43 @@ func downloadChunkWithRetry( return nil, fmt.Errorf("all %d attempts failed: %w", config.RetryAttempts+1, lastErr) } +func downloadFile( + ctx context.Context, + fileURL string, + filePath string, + config *models.DownloadConfig, +) (string, error) { + if config == nil { + config = DefaultConfig() + } + + reqCtx, cancel := context.WithTimeout(ctx, config.Timeout) + defer cancel() + + req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fileURL, nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + resp, err := downloadHTTPSession.Do(req) + if err != nil { + return "", fmt.Errorf("failed to download file: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + file, err := os.Create(filePath) + if err != nil { + return "", fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + _, err = io.Copy(file, resp.Body) + if err != nil { + return "", fmt.Errorf("failed to write file: %w", err) + } + return filePath, nil +} + func downloadChunk( ctx context.Context, fileURL string, @@ -472,6 +507,7 @@ func createChunks(fileSize int, chunkSize int) [][2]int { func downloadSegments( ctx context.Context, + path string, segmentURLs []string, config *models.DownloadConfig, ) ([]string, error) { @@ -479,20 +515,9 @@ func downloadSegments( config = DefaultConfig() } - tempDir := filepath.Join( - config.DownloadDir, - "segments"+uuid.NewString(), - ) - if err := os.MkdirAll(tempDir, 0755); err != nil { - return nil, fmt.Errorf("failed to create temporary directory: %w", err) - } - defer os.RemoveAll(tempDir) - semaphore := make(chan struct{}, config.Concurrency) var wg sync.WaitGroup - errChan := make(chan error, len(segmentURLs)) - var firstErr atomic.Value downloadedFiles := make([]string, len(segmentURLs)) @@ -526,40 +551,45 @@ func downloadSegments( defer func() { <-semaphore }() segmentFileName := fmt.Sprintf("segment_%05d", idx) - segmentPath := filepath.Join(tempDir, segmentFileName) + segmentPath := filepath.Join(path, segmentFileName) - _, err := DownloadFile(ctx, []string{url}, segmentFileName, &models.DownloadConfig{ - ChunkSize: config.ChunkSize, - Concurrency: 3, // segments are typically small - Timeout: config.Timeout, - DownloadDir: tempDir, - RetryAttempts: config.RetryAttempts, - RetryDelay: config.RetryDelay, - Remux: false, // don't remux individual segments - ProgressUpdater: nil, // no progress updates for individual segments - }) + filePath, err := downloadFile( + ctx, url, segmentPath, + &models.DownloadConfig{ + Timeout: config.Timeout, + }, + ) if err != nil { if firstErr.Load() == nil { firstErr.Store(fmt.Errorf("failed to download segment %d: %w", idx, err)) - cancelDownload() // Cancella tutte le altre download + cancelDownload() } return } - downloadedFiles[idx] = segmentPath + downloadedFiles[idx] = filePath }(i, segmentURL) } + wg.Wait() - go func() { - wg.Wait() - close(errChan) - }() + if err := firstErr.Load(); err != nil { + return nil, err.(error) + } + + for i, file := range downloadedFiles { + if file == "" { + return nil, fmt.Errorf("segment %d was not downloaded", i) + } + if _, err := os.Stat(file); os.IsNotExist(err) { + return nil, fmt.Errorf("segment %d file does not exist: %w", i, err) + } + } return downloadedFiles, nil } -func MergeSegmentFiles( +func mergeSegmentFiles( ctx context.Context, segmentPaths []string, outputFileName string, @@ -569,26 +599,14 @@ func MergeSegmentFiles( config = DefaultConfig() } - if err := EnsureDownloadDir(config.DownloadDir); err != nil { - return "", err - } - outputPath := filepath.Join(config.DownloadDir, outputFileName) outputFile, err := os.Create(outputPath) if err != nil { return "", fmt.Errorf("failed to create output file: %w", err) } - defer func() { - outputFile.Close() - if err != nil { - os.Remove(outputPath) - } - }() + defer outputFile.Close() - bufferedWriter := bufio.NewWriterSize( - outputFile, - 1024*1024, - ) // 1MB buffer + bufferedWriter := bufio.NewWriterSize(outputFile, 1024*1024) // 1MB buffer var totalBytes int64 var processedBytes int64 @@ -606,8 +624,6 @@ func MergeSegmentFiles( select { case <-ctx.Done(): bufferedWriter.Flush() - outputFile.Close() - os.Remove(outputPath) return "", ctx.Err() default: segmentFile, err := os.Open(segmentPath) @@ -615,13 +631,18 @@ func MergeSegmentFiles( return "", fmt.Errorf("failed to open segment %d: %w", i, err) } - written, err := io.Copy(bufferedWriter, segmentFile) + buf := make([]byte, 4*1024*1024) // 4MB buffer + written, err := io.CopyBuffer(bufferedWriter, segmentFile, buf) segmentFile.Close() if err != nil { return "", fmt.Errorf("failed to copy segment %d: %w", i, err) } + if err := bufferedWriter.Flush(); err != nil { + return "", fmt.Errorf("failed to flush after segment %d: %w", i, err) + } + if config.ProgressUpdater != nil && totalBytes > 0 { processedBytes += written progress := float64(processedBytes) / float64(totalBytes) @@ -633,6 +654,7 @@ func MergeSegmentFiles( if err := bufferedWriter.Flush(); err != nil { return "", fmt.Errorf("failed to flush data: %w", err) } + outputFile.Close() if config.Remux { err := av.RemuxFile(outputPath)