分段上传
更新时间: 2019-03-14 10:05
对于较大文件上传,可以切分成段上传。用户可以在如下的应用场景内(但不仅限于此),使用分段上传的模式:
- 上传超过100MB大小的文件。
- 网络条件较差,和OBS服务端之间的链接经常断开。
- 上传前无法确定将要上传文件的大小。
分段上传分为如下3个步骤:
- 初始化分段上传任务(ObsClient.InitiateMultipartUpload)。
- 逐个或并行上传段(ObsClient.UploadPart)。
- 合并段(ObsClient.CompleteMultipartUpload)或取消分段上传任务(ObsClient.AbortMultipartUpload)。
初始化分段上传任务
使用分段上传方式传输数据前,必须先通知OBS初始化一个分段上传任务。该操作会返回一个OBS服务端创建的全局唯一标识(Upload ID),用于标识本次分段上传任务。您可以根据这个唯一标识来发起相关的操作,如取消分段上传任务、列举分段上传任务、列举已上传的段等。
您可以通过ObsClient.InitiateMultipartUpload初始化一个分段上传任务:
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.InitiateMultipartUploadInput{}
- input.Bucket = "bucketname"
- input.Key = "objectkey"
- input.ContentType = "text/plain"
- input.Metadata = map[string]string{"property1": "property-value1", "property2": "property-value2"}
- output, err := obsClient.InitiateMultipartUpload(input)
- if err == nil {
- fmt.Printf("UploadId:%s\n", output.UploadId)
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- }
说明:
- 初始化分段上传任务时,除了指定上传对象的名称和所属桶外,您还可以使用InitiateMultipartUploadInput.ContentType和InitiateMultipartUploadInput.Metadata分别指定对象MIME类型和对象自定义元数据。
- 调用初始化分段上传任务接口成功后,会返回分段上传任务的全局唯一标识(Upload ID),在后面的操作中将用到它。
上传段
初始化一个分段上传任务之后,可以根据指定的对象名和Upload ID来分段上传数据。每一个上传的段都有一个标识它的号码——分段号(Part Number,范围是1~10000)。对于同一个Upload ID,该分段号不但唯一标识这一段数据,也标识了这段数据在整个对象内的相对位置。如果您用同一个分段号上传了新的数据,那么OBS上已有的这个段号的数据将被覆盖。除了最后一段以外,其他段的大小范围是100KB~5GB;最后段大小范围是0~5GB。每个段不需要按顺序上传,甚至可以在不同进程、不同机器上上传,OBS会按照分段号排序组成最终对象。
您可以通过ObsClient.UploadPart上传段:
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.UploadPartInput{}
- input.Bucket = "bucketname"
- input.Key = "objectkey"
- input.UploadId = "uploadid"
- input.SourceFile = "localfile"
- // 第一段分段号
- input.PartNumber = 1
- // 第一段的段偏移量
- input.Offset = 0
- // 第一段分段大小
- input.PartSize = 5 * 1024 * 1024
- // 上传第一段
- output, err := obsClient.UploadPart(input)
- if err == nil {
- // 获取上传成功的ETag值
- fmt.Printf("ETag:%s\n", output.ETag)
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- // 第二段分段号
- input.PartNumber = 2
- // 第二段的段偏移量
- input.Offset = 5 * 1024 * 1024
- // 第二段分段大小
- input.PartSize = 10 * 1024 * 1024
- // 上传第 二段
- output, err = obsClient.UploadPart(input)
- if err == nil {
- // 获取上传成功的ETag值
- fmt.Printf("ETag:%s\n", output.ETag)
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- }
说明:
- 上传段接口要求除最后一段以外,其他的段大小都要大于100KB。但是上传段接口并不会立即校验上传段的大小(因为不知道是否为最后一块);只有调用合并段接口时才会校验。
- OBS会将服务端收到段数据的ETag值(段数据的MD5值)返回给用户。
- 可以通过UploadPartInput.ContentMD5设置上传数据的MD5值,提供给OBS服务端用于校验数据完整性。
- 分段号的范围是1~10000。如果超出这个范围,OBS将返回400 Bad Request错误。
- OBS 3.0的桶支持最小段的大小为100KB,OBS 2.0的桶支持最小段的大小为5MB。请在OBS 3.0的桶上执行分段上传操作。
合并段
所有分段上传完成后,需要调用合并段接口来在OBS服务端生成最终对象。在执行该操作时,需要提供所有有效的分段列表(包括分段号和分段ETag值);OBS收到提交的分段列表后,会逐一验证每个段的有效性。当所有段验证通过后,OBS将把这些分段组合成最终的对象。
您可以通过ObsClient.CompleteMultipartUpload合并段:
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.CompleteMultipartUploadInput{}
- input.Bucket = "bucketname"
- input.Key = "objectkey"
- input.UploadId = "uploadid"
- input.Parts = []obs.Part{
- obs.Part{PartNumber: 1, ETag: "etag1"},
- obs.Part{PartNumber: 2, ETag: "etag2"},
- }
- output, err := obsClient.CompleteMultipartUpload(input)
- if err == nil {
- fmt.Printf("RequestId:%s\n", output.RequestId)
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- }
说明:
分段可以是不连续的。
并发分段上传
分段上传的主要目的是解决大文件上传或网络条件较差的情况。下面的示例代码展示了如何使用分段上传并发上传大文件:
- // 引入依赖包
- import (
- "fmt"
- "obs"
- "os"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- var bucketName = "bucketname"
- var objectKey = "objectkey"
- var filePath = "localfile"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- // 初始化分段上传任务
- input := &obs.InitiateMultipartUploadInput{}
- input.Bucket = bucketName
- input.Key = objectKey
- output, err := obsClient.InitiateMultipartUpload(input)
- if err != nil {
- panic(err)
- }
- uploadId := output.UploadId
- fmt.Printf("UploadId:%s\n", uploadId)
- fmt.Println()
- // 每段上传100MB
- var partSize int64 = 100 * 1024 * 1024
- stat, err := os.Stat(filePath)
- if err != nil {
- panic(err)
- }
- fileSize := stat.Size()
- // 计算需要上传的段数
- partCount := int(fileSize / partSize)
- if fileSize%partSize != 0 {
- partCount++
- }
- // 执行并发上传段
- partChan := make(chan obs.Part, 5)
- for i := 0; i < partCount; i++ {
- partNumber := i + 1
- offset := int64(i) * partSize
- currPartSize := partSize
- if i+1 == partCount {
- currPartSize = fileSize - offset
- }
- go func() {
- uploadPartInput := &obs.UploadPartInput{}
- uploadPartInput.Bucket = bucketName
- uploadPartInput.Key = objectKey
- uploadPartInput.UploadId = uploadId
- uploadPartInput.SourceFile = filePath
- // 分段号
- uploadPartInput.PartNumber = partNumber
- // 分段在文件中的起始位置
- uploadPartInput.Offset = offset
- // 分段大小
- uploadPartInput.PartSize = currPartSize
- uploadPartInputOutput, err := obsClient.UploadPart(uploadPartInput)
- if err != nil {
- panic(err)
- }
- fmt.Printf("%d finished\n", partNumber)
- partChan <- obs.Part{ETag: uploadPartInputOutput.ETag, PartNumber: uploadPartInputOutput.PartNumber}
- }()
- }
- parts := make([]obs.Part, 0, partCount)
- // 等待上传完成
- for {
- part, ok := <-partChan
- if !ok {
- break
- }
- parts = append(parts, part)
- if len(parts) == partCount {
- close(partChan)
- }
- }
- completeMultipartUploadInput := &obs.CompleteMultipartUploadInput{}
- completeMultipartUploadInput.Bucket = bucketName
- completeMultipartUploadInput.Key = objectKey
- completeMultipartUploadInput.UploadId = uploadId
- completeMultipartUploadInput.Parts = parts
- // 合并段
- completeMultipartUploadOutput, err := obsClient.CompleteMultipartUpload(completeMultipartUploadInput)
- if err != nil {
- panic(err)
- }
- fmt.Printf("RequestId:%s\n", completeMultipartUploadOutput.RequestId)
- }
说明:
大文件分段上传时,使用UploadPartInput.Offset和UploadPartInput.PartSize配合指定每段数据在文件中的起始结束位置。
取消分段上传任务
分段上传任务可以被取消,当一个分段上传任务被取消后,就不能再使用其Upload ID做任何操作,已经上传段也会被OBS删除。
您可以通过ObsClient.AbortMultipartUpload取消分段上传任务:
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.AbortMultipartUploadInput{}
- input.Bucket = "bucketname"
- input.Key = "objectkey"
- input.UploadId = "uploadid"
- output, err := obsClient.AbortMultipartUpload(input)
- if err == nil {
- fmt.Printf("RequestId:%s\n", output.RequestId)
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- }
列举已上传的段
您可使用ObsClient.ListParts列举出某一分段上传任务所有已经上传成功的段。
该接口可设置的参数如下:
参数 | 作用 |
---|---|
ListPartsInput.Bucket | 分段上传任务所属的桶名。 |
ListPartsInput.Key | 分段上传任务所属的对象名。 |
ListPartsInput.UploadId | 分段上传任务全局唯一标识,从ObsClient.InitiateMultipartUpload返回的结果获取。 |
ListPartsInput.MaxParts | 表示列举已上传的段返回结果最大段数目,即分页时每一页中段数目。 |
ListPartsInput.PartNumberMarker | 表示待列出段的起始位置,只有Part Number大于该参数的段会被列出。 |
- 简单列举
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.ListPartsInput{}
- input.Bucket = "bucketname"
- input.Key = "objectkey"
- input.UploadId = "uploadid"
- output, err := obsClient.ListParts(input)
- if err == nil {
- for index, part := range output.Parts {
- fmt.Printf("Part[%d]-ETag:%s, PartNumber:%d, LastModified:%s, Size:%d\n", index, part.ETag,
- part.PartNumber, part.LastModified, part.Size)
- }
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- }
说明:
- 列举段至多返回1000个段信息,如果指定的Upload ID包含的段数量大于1000,则返回结果中ListPartsOutput.IsTruncated为true表明本次没有返回全部段,并可通过ListPartsOutput.NextPartNumberMarker获取下次列举的起始位置。
- 如果想获取指定Upload ID包含的所有分段,可以采用分页列举的方式。
- 列举所有段
由于ObsClient.ListParts只能列举至多1000个段,如果段数量大于1000,列举所有分段请参考如下示例:
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.ListPartsInput{}
- input.Bucket = "bucketname"
- input.Key = "objectkey"
- input.UploadId = "uploadid"
- for {
- output, err := obsClient.ListParts(input)
- if err == nil {
- for index, part := range output.Parts {
- fmt.Printf("Part[%d]-ETag:%s, PartNumber:%d, LastModified:%s, Size:%d\n", index, part.ETag,
- part.PartNumber, part.LastModified, part.Size)
- }
- if output.IsTruncated {
- input.PartNumberMarker = output.NextPartNumberMarker
- } else {
- break
- }
- } else {
- if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- break
- }
- }
- }
列举分段上传任务
您可以通过ObsClient.ListMultipartUploads列举分段上传任务。列举分段上传任务可设置的参数如下:
参数 | 作用 |
---|---|
ListMultipartUploadsInput.Bucket | 桶名。 |
ListMultipartUploadsInput.Prefix | 限定返回的分段上传任务中的对象名必须带有Prefix前缀。 |
ListMultipartUploadsInput.MaxUploads | 列举分段上传任务的最大数目,取值范围为1~1000,当超出范围时,按照默认的1000进行处理。 |
ListMultipartUploadsInput.Delimiter | 用于对分段上传任务中的对象名进行分组的字符。对于对象名中包含Delimiter的任务,其对象名(如果请求中指定了Prefix,则此处的对象名需要去掉Prefix)中从首字符至第一个Delimiter之间的字符串将作为一个分组并作为CommonPrefix返回。 |
ListMultipartUploadsInput.KeyMarker | 表示列举时返回指定的KeyMarker之后的分段上传任务。 |
ListMultipartUploadsInput.UploadIdMarker | 只有与KeyMarker参数一起使用时才有意义,用于指定返回结果的起始位置,即列举时返回指定KeyMarker的UploadIdMarker之后的分段上传任务。 |
- 简单列举分段上传任务
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.ListMultipartUploadsInput{}
- input.Bucket = "bucketname"
- output, err := obsClient.ListMultipartUploads(input)
- if err == nil {
- for index, upload := range output.Uploads {
- fmt.Printf("Upload[%d]-OwnerId:%s, UploadId:%s, Key:%s, Initiated:%s,StorageClass:%s\n",
- index, upload.Owner.ID, upload.UploadId, upload.Key, upload.Initiated, upload.StorageClass)
- }
- } else if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- }
说明:
- 列举分段上传任务至多返回1000个任务信息,如果指定的桶包含的分段上传任务数量大于1000,则返回结果中ListMultipartUploadsOutput.IsTruncated为true表明本次没有返回全部结果,并可通过ListMultipartUploadsOutput.NextKeyMarker和ListMultipartUploadsOutput.NextUploadIdMarker获取下次列举的起点。
- 如果想获取指定桶包含的所有分段上传任务,可以采用分页列举的方式。
- 列举全部分段上传任务
- // 引入依赖包
- import (
- "fmt"
- "obs"
- )
- var ak = "*** Provide your Access Key ***"
- var sk = "*** Provide your Secret Key ***"
- var endpoint = "https://your-endpoint"
- // 创建ObsClient结构体
- var obsClient, _ = obs.New(ak, sk, endpoint)
- func main() {
- input := &obs.ListMultipartUploadsInput{}
- input.Bucket = "bucketname"
- output, err := obsClient.ListMultipartUploads(input)
- for {
- if err == nil {
- for index, upload := range output.Uploads {
- fmt.Printf("Upload[%d]-OwnerId:%s, UploadId:%s, Key:%s, Initiated:%s,StorageClass:%s\n",
- index, upload.Owner.ID, upload.UploadId, upload.Key, upload.Initiated, upload.StorageClass)
- }
- if output.IsTruncated {
- input.KeyMarker = output.NextKeyMarker
- input.UploadIdMarker = output.NextUploadIdMarker
- } else {
- break
- }
- } else {
- if obsError, ok := err.(obs.ObsError); ok {
- fmt.Printf("Code:%s\n", obsError.Code)
- fmt.Printf("Message:%s\n", obsError.Message)
- }
- break
- }
- }
- }
父主题:上传对象