Compare commits
2 commits
d565396c28
...
84c88c49ae
Author | SHA1 | Date | |
---|---|---|---|
|
84c88c49ae | ||
|
0d986d4573 |
2 changed files with 136 additions and 113 deletions
|
@ -173,17 +173,22 @@ func GetTweetAPI(
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
return nil, fmt.Errorf("invalid response code: %s", resp.Status)
|
return nil, fmt.Errorf("invalid response code: %s", resp.Status)
|
||||||
}
|
}
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
var apiResponse APIResponse
|
var apiResponse APIResponse
|
||||||
decoder := sonic.ConfigFastest.NewDecoder(resp.Body)
|
err = sonic.ConfigFastest.Unmarshal(body, &apiResponse)
|
||||||
err = decoder.Decode(&apiResponse)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse response: %w", err)
|
return nil, fmt.Errorf("failed to parse response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
result := apiResponse.Data.TweetResult.Result
|
result := apiResponse.Data.TweetResult.Result
|
||||||
if result == nil {
|
if result == nil {
|
||||||
return nil, errors.New("failed to get tweet result")
|
return nil, errors.New("failed to get tweet result")
|
||||||
}
|
}
|
||||||
|
|
||||||
var tweet *Tweet
|
var tweet *Tweet
|
||||||
if result.Tweet != nil {
|
if result.Tweet != nil {
|
||||||
tweet = result.Tweet
|
tweet = result.Tweet
|
||||||
|
|
240
util/download.go
240
util/download.go
|
@ -55,7 +55,6 @@ func DownloadFile(
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return "", ctx.Err()
|
return "", ctx.Err()
|
||||||
default:
|
default:
|
||||||
// create the download directory if it doesn't exist
|
|
||||||
if err := EnsureDownloadDir(config.DownloadDir); err != nil {
|
if err := EnsureDownloadDir(config.DownloadDir); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -184,26 +183,35 @@ func downloadInMemory(
|
||||||
return nil, fmt.Errorf("file too large for in-memory download: %d bytes", resp.ContentLength)
|
return nil, fmt.Errorf("file too large for in-memory download: %d bytes", resp.ContentLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
var bufPool = sync.Pool{
|
// allocate a single buffer with the
|
||||||
New: func() any {
|
// correct size upfront to prevent reallocations
|
||||||
return bytes.NewBuffer(make([]byte, 0, 1024*1024))
|
var data []byte
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := bufPool.Get().(*bytes.Buffer)
|
|
||||||
buf.Reset()
|
|
||||||
defer bufPool.Put(buf)
|
|
||||||
|
|
||||||
if resp.ContentLength > 0 {
|
if resp.ContentLength > 0 {
|
||||||
buf.Grow(int(resp.ContentLength))
|
data = make([]byte, 0, resp.ContentLength)
|
||||||
|
} else {
|
||||||
|
// 64KB initial capacity
|
||||||
|
data = make([]byte, 0, 64*1024)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = io.Copy(buf, resp.Body)
|
// use a limited reader to prevent
|
||||||
if err != nil {
|
// exceeding memory limits even if content-length is wrong
|
||||||
return nil, fmt.Errorf("failed to read response body: %w", err)
|
limitedReader := io.LimitReader(resp.Body, int64(config.MaxInMemory))
|
||||||
|
|
||||||
|
buf := make([]byte, 32*1024) // 32KB buffer
|
||||||
|
for {
|
||||||
|
n, err := limitedReader.Read(buf)
|
||||||
|
if n > 0 {
|
||||||
|
data = append(data, buf[:n]...)
|
||||||
|
}
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read response body: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return buf.Bytes(), nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func EnsureDownloadDir(dir string) error {
|
func EnsureDownloadDir(dir string) error {
|
||||||
|
@ -252,12 +260,15 @@ func runChunkedDownload(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks := createChunks(fileSize, config.ChunkSize)
|
numChunks := 1
|
||||||
|
if fileSize > 0 {
|
||||||
|
numChunks = int(math.Ceil(float64(fileSize) / float64(config.ChunkSize)))
|
||||||
|
}
|
||||||
|
|
||||||
semaphore := make(chan struct{}, config.Concurrency)
|
semaphore := make(chan struct{}, config.Concurrency)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
errChan := make(chan error, len(chunks))
|
errChan := make(chan error, numChunks)
|
||||||
var downloadErr error
|
var downloadErr error
|
||||||
var errOnce sync.Once
|
var errOnce sync.Once
|
||||||
|
|
||||||
|
@ -267,12 +278,22 @@ func runChunkedDownload(
|
||||||
downloadCtx, cancelDownload := context.WithCancel(ctx)
|
downloadCtx, cancelDownload := context.WithCancel(ctx)
|
||||||
defer cancelDownload()
|
defer cancelDownload()
|
||||||
|
|
||||||
for idx, chunk := range chunks {
|
// use a mutex to synchronize file access
|
||||||
|
var fileMutex sync.Mutex
|
||||||
|
|
||||||
|
for i := range numChunks {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func(idx int, chunk [2]int) {
|
go func(chunkIndex int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
|
// calculate chunk bounds
|
||||||
|
start := chunkIndex * config.ChunkSize
|
||||||
|
end := start + config.ChunkSize - 1
|
||||||
|
if end >= fileSize && fileSize > 0 {
|
||||||
|
end = fileSize - 1
|
||||||
|
}
|
||||||
|
|
||||||
// respect concurrency limit
|
// respect concurrency limit
|
||||||
select {
|
select {
|
||||||
case semaphore <- struct{}{}:
|
case semaphore <- struct{}{}:
|
||||||
|
@ -281,36 +302,31 @@ func runChunkedDownload(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkData, err := downloadChunkWithRetry(downloadCtx, fileURL, chunk, config)
|
err := downloadChunkToFile(
|
||||||
|
downloadCtx, fileURL,
|
||||||
|
file, start, end,
|
||||||
|
config, &fileMutex,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errOnce.Do(func() {
|
errOnce.Do(func() {
|
||||||
downloadErr = fmt.Errorf("chunk %d: %w", idx, err)
|
downloadErr = fmt.Errorf("chunk %d: %w", chunkIndex, err)
|
||||||
cancelDownload() // cancel all other downloads
|
cancelDownload() // cancel all other downloads
|
||||||
errChan <- downloadErr
|
errChan <- downloadErr
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := writeChunkToFile(file, chunkData, chunk[0]); err != nil {
|
|
||||||
errOnce.Do(func() {
|
|
||||||
downloadErr = fmt.Errorf("failed to write chunk %d: %w", idx, err)
|
|
||||||
cancelDownload()
|
|
||||||
errChan <- downloadErr
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// update progress
|
// update progress
|
||||||
chunkSize := chunk[1] - chunk[0] + 1
|
chunkSize := end - start + 1
|
||||||
completedChunks.Add(1)
|
completedChunks.Add(1)
|
||||||
completedBytes.Add(int64(chunkSize))
|
completedBytes.Add(int64(chunkSize))
|
||||||
progress := float64(completedBytes.Load()) / float64(fileSize)
|
if fileSize > 0 {
|
||||||
|
progress := float64(completedBytes.Load()) / float64(fileSize)
|
||||||
// report progress if handler exists
|
if config.ProgressUpdater != nil {
|
||||||
if config.ProgressUpdater != nil {
|
config.ProgressUpdater(progress)
|
||||||
config.ProgressUpdater(progress)
|
}
|
||||||
}
|
}
|
||||||
}(idx, chunk)
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
@ -352,7 +368,11 @@ func runChunkedDownload(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFileSize(ctx context.Context, fileURL string, timeout time.Duration) (int, error) {
|
func getFileSize(
|
||||||
|
ctx context.Context,
|
||||||
|
fileURL string,
|
||||||
|
timeout time.Duration,
|
||||||
|
) (int, error) {
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -374,12 +394,15 @@ func getFileSize(ctx context.Context, fileURL string, timeout time.Duration) (in
|
||||||
return int(resp.ContentLength), nil
|
return int(resp.ContentLength), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func downloadChunkWithRetry(
|
func downloadChunkToFile(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
fileURL string,
|
fileURL string,
|
||||||
chunk [2]int,
|
file *os.File,
|
||||||
|
start int,
|
||||||
|
end int,
|
||||||
config *models.DownloadConfig,
|
config *models.DownloadConfig,
|
||||||
) ([]byte, error) {
|
fileMutex *sync.Mutex,
|
||||||
|
) error {
|
||||||
var lastErr error
|
var lastErr error
|
||||||
|
|
||||||
for attempt := 0; attempt <= config.RetryAttempts; attempt++ {
|
for attempt := 0; attempt <= config.RetryAttempts; attempt++ {
|
||||||
|
@ -387,20 +410,72 @@ func downloadChunkWithRetry(
|
||||||
// wait before retry
|
// wait before retry
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return ctx.Err()
|
||||||
case <-time.After(config.RetryDelay):
|
case <-time.After(config.RetryDelay):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := downloadChunk(ctx, fileURL, chunk, config.Timeout)
|
err := downloadAndWriteChunk(
|
||||||
|
ctx, fileURL, file,
|
||||||
|
start, end, config.Timeout,
|
||||||
|
fileMutex,
|
||||||
|
)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return data, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lastErr = err
|
lastErr = err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("all %d attempts failed: %w", config.RetryAttempts+1, lastErr)
|
return fmt.Errorf("all %d attempts failed: %w", config.RetryAttempts+1, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func downloadAndWriteChunk(
|
||||||
|
ctx context.Context,
|
||||||
|
fileURL string,
|
||||||
|
file *os.File,
|
||||||
|
start int,
|
||||||
|
end int,
|
||||||
|
timeout time.Duration,
|
||||||
|
fileMutex *sync.Mutex,
|
||||||
|
) error {
|
||||||
|
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fileURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", start, end))
|
||||||
|
|
||||||
|
resp, err := downloadHTTPSession.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("download failed: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// use a fixed-size buffer for
|
||||||
|
// copying to avoid large allocations (32KB)
|
||||||
|
buf := make([]byte, 32*1024)
|
||||||
|
|
||||||
|
fileMutex.Lock()
|
||||||
|
defer fileMutex.Unlock()
|
||||||
|
|
||||||
|
if _, err := file.Seek(int64(start), io.SeekStart); err != nil {
|
||||||
|
return fmt.Errorf("failed to seek file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = io.CopyBuffer(file, resp.Body, buf)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to write chunk data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func downloadFile(
|
func downloadFile(
|
||||||
|
@ -421,85 +496,28 @@ func downloadFile(
|
||||||
return "", fmt.Errorf("failed to download file: %w", err)
|
return "", fmt.Errorf("failed to download file: %w", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
file, err := os.Create(filePath)
|
file, err := os.Create(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to create file: %w", err)
|
return "", fmt.Errorf("failed to create file: %w", err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
_, err = io.Copy(file, resp.Body)
|
|
||||||
|
// use a fixed-size buffer for
|
||||||
|
// copying to avoid large allocations (32KB)
|
||||||
|
buf := make([]byte, 32*1024)
|
||||||
|
_, err = io.CopyBuffer(file, resp.Body, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to write file: %w", err)
|
return "", fmt.Errorf("failed to write file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return filePath, nil
|
return filePath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func downloadChunk(
|
|
||||||
ctx context.Context,
|
|
||||||
fileURL string,
|
|
||||||
chunk [2]int,
|
|
||||||
timeout time.Duration,
|
|
||||||
) ([]byte, error) {
|
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, fileURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
||||||
}
|
|
||||||
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", chunk[0], chunk[1]))
|
|
||||||
|
|
||||||
resp, err := downloadHTTPSession.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("download failed: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
|
|
||||||
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
var buf bytes.Buffer
|
|
||||||
if resp.ContentLength > 0 {
|
|
||||||
buf.Grow(int(resp.ContentLength))
|
|
||||||
} else {
|
|
||||||
buf.Grow(chunk[1] - chunk[0] + 1)
|
|
||||||
}
|
|
||||||
_, err = io.Copy(&buf, resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to read chunk data: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.Bytes(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeChunkToFile(file *os.File, data []byte, offset int) error {
|
|
||||||
_, err := file.WriteAt(data, int64(offset))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func createChunks(fileSize int, chunkSize int) [][2]int {
|
|
||||||
if fileSize <= 0 {
|
|
||||||
return [][2]int{{0, 0}}
|
|
||||||
}
|
|
||||||
|
|
||||||
numChunks := int(math.Ceil(float64(fileSize) / float64(chunkSize)))
|
|
||||||
chunks := make([][2]int, numChunks)
|
|
||||||
|
|
||||||
for i := range chunks {
|
|
||||||
start := i * chunkSize
|
|
||||||
end := start + chunkSize - 1
|
|
||||||
if end >= fileSize {
|
|
||||||
end = fileSize - 1
|
|
||||||
}
|
|
||||||
chunks[i] = [2]int{start, end}
|
|
||||||
}
|
|
||||||
|
|
||||||
return chunks
|
|
||||||
}
|
|
||||||
|
|
||||||
func downloadSegments(
|
func downloadSegments(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
path string,
|
path string,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue