之前在Github社区偶然发现一个Go实现的下载器,作为个人学习的一个参照模板,这里要感谢一下作者monkeyWie(网名)的开源项目gopeed-core

前言

之前在Github社区偶然发现一个Go实现的下载器,由于此前对于这方面没有研究,抱着了解其内部实现原理的好奇心,拉取了相关开源代码,期间进行调试与分析,作为个人学习的一个参照模板,这里要感谢一下作者monkeyWie(网名)的开源项目gopeed-core

思路

在初步浏览梳理相关代码逻辑之后,大概总结下下载器的主要流程如下:

  1. 发起请求获取下载链接的文件名、文件大小、是否支持“断点续传”等状态。
  2. 支持断点续传?
    • 支持,则依据并发数量,为每个协程独立分配下载任务区间
    • 不支持,单协程同步下载
  3. 启动并发下载,由HTTP请求header获取文件字节区间,分而治之。

相关技术点

下面先列举项目一些引用到的基础知识,熟悉用法之后再来聊聊项目中的具体应用实现。

ErrGroup

在Go原生的sync包中有一个常用的结构,sync.WaitGroup,在并发编程中经常会使用到,其中的wg.Wait()用于通知调用处进行等待,等待所有期待的任务执行wg.Done(),方便我们实现一个阻塞等待的框架。
在它之后衍生出一个sync.ErrGroup包,可以在wait()处返回任务执行返回的error

  • 等待子任务完成
  • catch子任务err并决定做何处理

在将一个任务分解为多块并分治的场景十分适合这种设计模型,类似于MapReduce或者称为‘scatter and gather’。 比如分段下载、分页查找等。

when error occurs, context cancelled, so use select will fine, “scatter and gather”(分治)

使用方法:
errGroup通常伴随一个errCtx上下文出现,用于通知同一个组内的协程,处理逻辑可以由代码控制。

// 创建一个errGroup,以及一个errCtx上下文
eg, errCtx := errgroup.WithContext(context.Background())

使用eg.Go()启动,传入签名为func() error的函数作为参数:

eg.Go(func() error {
    // 任意业务错误
	return errors.New("egError")
})

内部分析:
观察源码可以知道出错时会调用g.cancel(),对应着上面返回err上下文的errCtx.Done()

// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	g.wg.Add(1)

	go func() {
		defer g.wg.Done()
        // 如果任务返回err,显式调用g绑定的cancel()函数
		if err := f(); err != nil {
			g.errOnce.Do(func() {
			    // 这里的err不会有shadowed,即只上报第一个错误
				g.err = err
				if g.cancel != nil {
				    // 触发errCtx.Done()
					g.cancel()
				}
			})
		}
	}()
}

示例demo:

const (
	CON = 10
)

func main() {
    var eg *errgroup.Group
	eg, errCtx := errgroup.WithContext(context.Background())
	for i := 0; i < CON; i++ {
		i := i
		eg.Go(func() error {
			if i == 3 {
				return errors.New(fmt.Sprintf("Mock err: %d", i))
			}
			select {
			// 让Done分支判断在Mock err出现后命中, 等待1s
			case <-time.After(time.Duration(1) * time.Second):
			case <-errCtx.Done():
			    fmt.Println("meet err in job:", i)
				return errCtx.Err()
			}
			fmt.Println(i)
			return nil
		})
	}

	// wait for done or err occurs
	if err := eg.Wait(); err == nil {
		log.Print("all looks good.")
	} else {
		log.Printf("errors occurs: %v", err)
	}
	return
}

输出:
可以看到,在组内i等于3的协程抛出错误之后,组内其他协程errCtx.Done()分支都命中了,所以它们直接返回errGroup捕获的第一个错误

2021/06/24 07:24:55 meet err in job: 9
2021/06/24 07:24:55 meet err in job: 1
2021/06/24 07:24:55 meet err in job: 4
2021/06/24 07:24:55 meet err in job: 2
2021/06/24 07:24:55 meet err in job: 0
2021/06/24 07:24:55 meet err in job: 7
2021/06/24 07:24:55 meet err in job: 8
2021/06/24 07:24:55 meet err in job: 5
2021/06/24 07:24:55 meet err in job: 6
2021/06/24 07:24:55 errors occurs: Mock err: 3

HTTP Header参数

HTTP/1.1

区别于HTTP1.0,HTTP/1.1除了提供了长连接的功能之外,还有一个升级点就是在HTTP1.1之后开启了断点续传功能,即请求端可以通过Range标签只请求资源的某个部分。

Header参数

名词解释 含义
Range 设置请求文件字节区间,例:Range:bytes=0-1023,表示前1024个字节
Content-Range Content-Range:bytes 0-1023/2047,服务器返回Header,表示此次请求区间,以及文件总大小。
状态码:206 使用Range请求文件区间之后,服务端一般会返回206表示支持断点续传
Content-Disposition 正文描述,本例用于获取文件名(也可从URL连接进行解析)

示例demo:

  • 启动本地文件服务器
// 启动本地文件服务器,打开127.0.0.1:8899可以看到共享文件列表
http.Handle("/", http.FileServer(http.Dir("D:\\测试下载\\serverFile")))
if err := http.ListenAndServe(":8899", nil); err != nil {
	fmt.Println("err:", err)
}
  • 相关常量
// basic/downloader/mdl/constansts.go
const (
	HttpCodeOK             = 200
	HttpCodePartialContent = 206

	HttpHeaderRange              = "Range"
	HttpHeaderContentLength      = "Content-Length"
	HttpHeaderContentRange       = "Content-Range"
	HttpHeaderContentDisposition = "Content-Disposition"

	HttpHeaderRangeFormat = "bytes=%d-%d"
)
  • 解析目标文件信息(获取文件大小/是否支持断点续传)
const (
	DOWNLOAD_URL = "http://127.0.0.1:8899/demo.zip"
	SAVE_PATH    = "D:\\测试下载\\download"
	CON          = 8
	RETRY_COUNT  = 5
)

// ...

// 构造请求并设置header
req, err := buildReq(ctx, reqURL)
if err != nil {
	return nil, err
}

// 只访问一个字节,测试资源是否支持Range请求
req.Header.Set(mdl.HttpHeaderRange, fmt.Sprintf(mdl.HttpHeaderRangeFormat, 0, 0))
resp, err := http.DefaultClient.Do(req)

// 解析是否支持断点续传/文件大小
if resp.StatusCode == mdl.HttpCodePartialContent {
	// 支持断点下载
	res.Range = true
	// 从Content-Range:bytes 0-0/137783533 获取大小
	totalSize := path.Base(resp.Header.Get(mdl.HttpHeaderContentRange))
}

文件读写

这部分是下载过程,主要是文件的操作

  • 文件创建
    在上一个初始化下载链接之后拿到文件大小了,所以可以预创建文件并分配总占用字节数。

    func touch(fileName string, size int64) (file *os.File, err error) {
        // 创建文件
        file, err = os.Create(fileName)
        if size > 0 {
            // 分配大小
            err := os.Truncate(fileName, size)
            if err != nil {
                return nil, err
            }
        }
        return
    }
    
  • 文件写入
    在并发数大于0以及文件协议支持断点续传的前提下,开启多协程并发下载,这里引出了一个困惑。
    核心问题: 多协程并发写同一个文件,如何避免加锁提高效率?

    笔者在刚开始拉取项目代码的时候,作者已经优化过并提供了最终方案了,我当时“先入为主”地忽视了这个问题,之后在看commit记录和issue列表的时候才觉察最初这部分作者是做了互斥锁的。

    部分代码如下: 互斥锁代码

    之后回想其实并发操作文件并无影响,因为我们在刚开始就创建文件并分配总大小,后续子协程只需要根据自己负责的文件字节下标区间去写入就好了,这里与写顺序无关,因此其实不需要加锁。(笔者后来也找作者再次确认这个猜想)

    最终作者实现的代码如下,
    定义每个文件切分的块,下标用于向服务器申请文件区间:

    type Chunk struct {
        Status Status       // 文件块下载状态
        Begin  int64        // 开始下标
        End    int64        // 结束下标
        Downloaded int64    // 当前文件块已下载字节, 用于文件追加或者出错断点续传
    }
    

    定义文件块数组并分配区间:

    var (
        // 切分文件块
        chunks []*mdl.Chunk
        // 切分块数
        ckTol int
    )
    // 支持切分
    if res.Range {
        // 并发8个协程下载
        ckTol = 8
        chunks = make([]*mdl.Chunk, ckTol)
        partSize := res.TotalSize / int64(ckTol)
        for i := 0; i < ckTol; i++ {
            var (
                begin = partSize * int64(i)
                end   int64
            )
            if i == (ckTol - 1) {
                end = res.TotalSize - 1
            } else {
                end = begin + partSize - 1
            }
            ck := mdl.NewChunk(begin, end)
            chunks[i] = ck
        }
    } else {
        ckTol = 1
        // 单连接下载
        chunks = make([]*mdl.Chunk, ckTol)
        chunks[0] = mdl.NewChunk(0, 0)
    }
    

    下载细节:

    func fetch(ctx context.Context, res *mdl.Resource,
        file *os.File, chks []*mdl.Chunk, c int, doneCh chan error) {
        // 使用errgroup捕获异常并通知外部doneCh通道
        eg, _ := errgroup.WithContext(ctx)
        for i := 0; i < c; i++ {
            // 注意闭包写法
            i := i
            eg.Go(func() error {
                // 并发下载
                return fetchChunk(ctx, res, file, i, chks)
            })
        }
      
        go func() {
            // error from errgroup
            err := eg.Wait()
            // 关闭文件
            file.Close()
            // 接收fetchChunk()内部状态
            doneCh <- err
        }()
        return
    }
      
    func fetchChunk(ctx context.Context, res *mdl.Resource, file *os.File, index int, chks []*mdl.Chunk) (err error) {
        ck := chks[index]
        req, err := buildReq(ctx, DOWNLOAD_URL)
        if err != nil {
            return err
        }
        var (
            client = http.DefaultClient
            buf    = make([]byte, 8192)
        )
        /**************重试区间开始**************/
        // 根据是否分块下载设置header
        //	- 根据当前chunk设置文件区间到header
        //	- 判断请求返回status
        //		- 失败: 少于5次则做重试
        //		- 成功:
        //				根据offset, 把buf写入文件
        //				- 成功: return, 通知外部
        //				- 失败: 重试少于5次, 返回重试
        for i := 0; i < RETRY_COUNT; i++ {
            var resp *http.Response
            if res.Range {
                req.Header.Set(mdl.HttpHeaderRange,
                    fmt.Sprintf(mdl.HttpHeaderRangeFormat, chks[index].Begin+ck.Downloaded, chks[index].End))
            } else {
                // 单连接重试没有断点续传
                ck.Downloaded = 0
            }
      
            // 获取字节区间
            if err := func() error {
                resp, err = client.Do(req)
                if err != nil {
                    return err
                }
                if resp.StatusCode != mdl.HttpCodeOK && resp.StatusCode != mdl.HttpCodePartialContent {
                    return errors.New(fmt.Sprintf("%d,%s", resp.StatusCode, resp.Status))
                }
                return nil
            }(); err != nil {
                continue
            }
      
            // 从body提取buf写入文件, 重置重试标识
            i = 0
            retry := false
            retry, err = func() (bool, error) {
                defer resp.Body.Close()
                for {
                    n, err := resp.Body.Read(buf)
                    if n > 0 {
                        // 每次写入从已下载位置开始写,保证字节顺序
                        _, err := file.WriteAt(buf[:n], ck.Begin+ck.Downloaded)
                        if err != nil {
                            // 文件出错不重试
                            return false, err
                        }
                        // 记录已下载, 用于断点续传
                        ck.Downloaded += int64(n)
                    }
                    if err != nil {
                        // err from read
                        if err != io.EOF {
                            return true, err
                        }
                        break
                    }
                }
                // success exit
                return false, nil
            }()
            if !retry {
                break
            }
        }
        /**************重试区间结束**************/
      
        // 通知外部
        return
    }
      
    

    其实代码逻辑相对简单,只是加多了错误重试所以读起来有点绕,关键点在于ck.Downloaded字段的维护更新,无论是正常下载还是失败重试,都需要严格保证ck.Downloaded当前值的准确性。

代码分析

笔者对原版代码进行调整和部分核心功能的抽取,以求快速理解其下载思路,作者使用了较常用的工厂和适配器模式,后续如果有机会可以展开分析,再次感谢作者monkeyWie提供的轮子。

参考地址

Coordinating goroutines — errGroup
https://levelup.gitconnected.com/coordinating-goroutines-errgroup-c78bb5d80232
monkeyWie/gopeed-core
https://github.com/monkeyWie/gopeed-core