分段上传

更新时间: 2019-03-14 10:05

对于较大文件上传,可以切分成段上传。用户可以在如下的应用场景内(但不仅限于此),使用分段上传的模式:

  • 上传超过100MB大小的文件。
  • 网络条件较差,和OBS服务端之间的链接经常断开。
  • 上传前无法确定将要上传文件的大小。

分段上传分为如下3个步骤:

  • 初始化分段上传任务(ObsClient.InitiateMultipartUpload)。
  • 逐个或并行上传段(ObsClient.UploadPart)。
  • 合并段(ObsClient.CompleteMultipartUpload)或取消分段上传任务(ObsClient.AbortMultipartUpload)。

初始化分段上传任务

使用分段上传方式传输数据前,必须先通知OBS初始化一个分段上传任务。该操作会返回一个OBS服务端创建的全局唯一标识(Upload ID),用于标识本次分段上传任务。您可以根据这个唯一标识来发起相关的操作,如取消分段上传任务、列举分段上传任务、列举已上传的段等。

您可以通过ObsClient.InitiateMultipartUpload初始化一个分段上传任务:

  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.InitiateMultipartUploadInput{}
  16. input.Bucket = "bucketname"
  17. input.Key = "objectkey"
  18. input.ContentType = "text/plain"
  19. input.Metadata = map[string]string{"property1": "property-value1", "property2": "property-value2"}
  20. output, err := obsClient.InitiateMultipartUpload(input)
  21. if err == nil {
  22. fmt.Printf("UploadId:%s\n", output.UploadId)
  23. } else if obsError, ok := err.(obs.ObsError); ok {
  24. fmt.Printf("Code:%s\n", obsError.Code)
  25. fmt.Printf("Message:%s\n", obsError.Message)
  26. }
  27. }

分段上传 - 图1 说明:

  • 初始化分段上传任务时,除了指定上传对象的名称和所属桶外,您还可以使用InitiateMultipartUploadInput.ContentType和InitiateMultipartUploadInput.Metadata分别指定对象MIME类型和对象自定义元数据。
  • 调用初始化分段上传任务接口成功后,会返回分段上传任务的全局唯一标识(Upload ID),在后面的操作中将用到它。

上传段

初始化一个分段上传任务之后,可以根据指定的对象名和Upload ID来分段上传数据。每一个上传的段都有一个标识它的号码——分段号(Part Number,范围是1~10000)。对于同一个Upload ID,该分段号不但唯一标识这一段数据,也标识了这段数据在整个对象内的相对位置。如果您用同一个分段号上传了新的数据,那么OBS上已有的这个段号的数据将被覆盖。除了最后一段以外,其他段的大小范围是100KB~5GB;最后段大小范围是0~5GB。每个段不需要按顺序上传,甚至可以在不同进程、不同机器上上传,OBS会按照分段号排序组成最终对象。

您可以通过ObsClient.UploadPart上传段:

  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.UploadPartInput{}
  16. input.Bucket = "bucketname"
  17. input.Key = "objectkey"
  18. input.UploadId = "uploadid"
  19. input.SourceFile = "localfile"
  20. // 第一段分段号
  21. input.PartNumber = 1
  22. // 第一段的段偏移量
  23. input.Offset = 0
  24. // 第一段分段大小
  25. input.PartSize = 5 * 1024 * 1024
  26. // 上传第一段
  27. output, err := obsClient.UploadPart(input)
  28. if err == nil {
  29. // 获取上传成功的ETag
  30. fmt.Printf("ETag:%s\n", output.ETag)
  31. } else if obsError, ok := err.(obs.ObsError); ok {
  32. fmt.Printf("Code:%s\n", obsError.Code)
  33. fmt.Printf("Message:%s\n", obsError.Message)
  34. }
  35.  
  36. // 第二段分段号
  37. input.PartNumber = 2
  38. // 第二段的段偏移量
  39. input.Offset = 5 * 1024 * 1024
  40. // 第二段分段大小
  41. input.PartSize = 10 * 1024 * 1024
  42. // 上传第 二段
  43. output, err = obsClient.UploadPart(input)
  44. if err == nil {
  45. // 获取上传成功的ETag
  46. fmt.Printf("ETag:%s\n", output.ETag)
  47. } else if obsError, ok := err.(obs.ObsError); ok {
  48. fmt.Printf("Code:%s\n", obsError.Code)
  49. fmt.Printf("Message:%s\n", obsError.Message)
  50. }
  51. }

分段上传 - 图2 说明:

  • 上传段接口要求除最后一段以外,其他的段大小都要大于100KB。但是上传段接口并不会立即校验上传段的大小(因为不知道是否为最后一块);只有调用合并段接口时才会校验。
  • OBS会将服务端收到段数据的ETag值(段数据的MD5值)返回给用户。
  • 可以通过UploadPartInput.ContentMD5设置上传数据的MD5值,提供给OBS服务端用于校验数据完整性。
  • 分段号的范围是1~10000。如果超出这个范围,OBS将返回400 Bad Request错误。
  • OBS 3.0的桶支持最小段的大小为100KB,OBS 2.0的桶支持最小段的大小为5MB。请在OBS 3.0的桶上执行分段上传操作。

合并段

所有分段上传完成后,需要调用合并段接口来在OBS服务端生成最终对象。在执行该操作时,需要提供所有有效的分段列表(包括分段号和分段ETag值);OBS收到提交的分段列表后,会逐一验证每个段的有效性。当所有段验证通过后,OBS将把这些分段组合成最终的对象。

您可以通过ObsClient.CompleteMultipartUpload合并段:

  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.CompleteMultipartUploadInput{}
  16. input.Bucket = "bucketname"
  17. input.Key = "objectkey"
  18. input.UploadId = "uploadid"
  19. input.Parts = []obs.Part{
  20. obs.Part{PartNumber: 1, ETag: "etag1"},
  21. obs.Part{PartNumber: 2, ETag: "etag2"},
  22. }
  23.  
  24. output, err := obsClient.CompleteMultipartUpload(input)
  25. if err == nil {
  26. fmt.Printf("RequestId:%s\n", output.RequestId)
  27. } else if obsError, ok := err.(obs.ObsError); ok {
  28. fmt.Printf("Code:%s\n", obsError.Code)
  29. fmt.Printf("Message:%s\n", obsError.Message)
  30. }
  31. }

分段上传 - 图3 说明:

分段可以是不连续的。

并发分段上传

分段上传的主要目的是解决大文件上传或网络条件较差的情况。下面的示例代码展示了如何使用分段上传并发上传大文件:

  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. "os"
  6. )
  7.  
  8. var ak = "*** Provide your Access Key ***"
  9. var sk = "*** Provide your Secret Key ***"
  10. var endpoint = "https://your-endpoint"
  11. var bucketName = "bucketname"
  12. var objectKey = "objectkey"
  13. var filePath = "localfile"
  14.  
  15. // 创建ObsClient结构体
  16. var obsClient, _ = obs.New(ak, sk, endpoint)
  17.  
  18. func main() {
  19. // 初始化分段上传任务
  20. input := &obs.InitiateMultipartUploadInput{}
  21. input.Bucket = bucketName
  22. input.Key = objectKey
  23. output, err := obsClient.InitiateMultipartUpload(input)
  24.  
  25. if err != nil {
  26. panic(err)
  27. }
  28.  
  29. uploadId := output.UploadId
  30.  
  31. fmt.Printf("UploadId:%s\n", uploadId)
  32. fmt.Println()
  33.  
  34. // 每段上传100MB
  35. var partSize int64 = 100 * 1024 * 1024
  36.  
  37. stat, err := os.Stat(filePath)
  38. if err != nil {
  39. panic(err)
  40. }
  41. fileSize := stat.Size()
  42.  
  43. // 计算需要上传的段数
  44. partCount := int(fileSize / partSize)
  45.  
  46. if fileSize%partSize != 0 {
  47. partCount++
  48. }
  49.  
  50. // 执行并发上传段
  51. partChan := make(chan obs.Part, 5)
  52.  
  53. for i := 0; i < partCount; i++ {
  54. partNumber := i + 1
  55. offset := int64(i) * partSize
  56. currPartSize := partSize
  57. if i+1 == partCount {
  58. currPartSize = fileSize - offset
  59. }
  60. go func() {
  61. uploadPartInput := &obs.UploadPartInput{}
  62. uploadPartInput.Bucket = bucketName
  63. uploadPartInput.Key = objectKey
  64. uploadPartInput.UploadId = uploadId
  65. uploadPartInput.SourceFile = filePath
  66. // 分段号
  67. uploadPartInput.PartNumber = partNumber
  68. // 分段在文件中的起始位置
  69. uploadPartInput.Offset = offset
  70. // 分段大小
  71. uploadPartInput.PartSize = currPartSize
  72. uploadPartInputOutput, err := obsClient.UploadPart(uploadPartInput)
  73. if err != nil {
  74. panic(err)
  75. }
  76. fmt.Printf("%d finished\n", partNumber)
  77. partChan <- obs.Part{ETag: uploadPartInputOutput.ETag, PartNumber: uploadPartInputOutput.PartNumber}
  78. }()
  79. }
  80.  
  81. parts := make([]obs.Part, 0, partCount)
  82.  
  83. // 等待上传完成
  84. for {
  85. part, ok := <-partChan
  86. if !ok {
  87. break
  88. }
  89. parts = append(parts, part)
  90.  
  91. if len(parts) == partCount {
  92. close(partChan)
  93. }
  94. }
  95.  
  96. completeMultipartUploadInput := &obs.CompleteMultipartUploadInput{}
  97. completeMultipartUploadInput.Bucket = bucketName
  98. completeMultipartUploadInput.Key = objectKey
  99. completeMultipartUploadInput.UploadId = uploadId
  100. completeMultipartUploadInput.Parts = parts
  101. // 合并段
  102. completeMultipartUploadOutput, err := obsClient.CompleteMultipartUpload(completeMultipartUploadInput)
  103. if err != nil {
  104. panic(err)
  105. }
  106. fmt.Printf("RequestId:%s\n", completeMultipartUploadOutput.RequestId)
  107. }

分段上传 - 图4 说明:

大文件分段上传时,使用UploadPartInput.Offset和UploadPartInput.PartSize配合指定每段数据在文件中的起始结束位置。

取消分段上传任务

分段上传任务可以被取消,当一个分段上传任务被取消后,就不能再使用其Upload ID做任何操作,已经上传段也会被OBS删除。

您可以通过ObsClient.AbortMultipartUpload取消分段上传任务:

  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.AbortMultipartUploadInput{}
  16. input.Bucket = "bucketname"
  17. input.Key = "objectkey"
  18. input.UploadId = "uploadid"
  19.  
  20. output, err := obsClient.AbortMultipartUpload(input)
  21. if err == nil {
  22. fmt.Printf("RequestId:%s\n", output.RequestId)
  23. } else if obsError, ok := err.(obs.ObsError); ok {
  24. fmt.Printf("Code:%s\n", obsError.Code)
  25. fmt.Printf("Message:%s\n", obsError.Message)
  26. }
  27. }

列举已上传的段

您可使用ObsClient.ListParts列举出某一分段上传任务所有已经上传成功的段。

该接口可设置的参数如下:


参数

作用

ListPartsInput.Bucket

分段上传任务所属的桶名。

ListPartsInput.Key

分段上传任务所属的对象名。

ListPartsInput.UploadId

分段上传任务全局唯一标识,从ObsClient.InitiateMultipartUpload返回的结果获取。

ListPartsInput.MaxParts

表示列举已上传的段返回结果最大段数目,即分页时每一页中段数目。

ListPartsInput.PartNumberMarker

表示待列出段的起始位置,只有Part Number大于该参数的段会被列出。
  • 简单列举
  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.ListPartsInput{}
  16. input.Bucket = "bucketname"
  17. input.Key = "objectkey"
  18. input.UploadId = "uploadid"
  19.  
  20. output, err := obsClient.ListParts(input)
  21. if err == nil {
  22. for index, part := range output.Parts {
  23. fmt.Printf("Part[%d]-ETag:%s, PartNumber:%d, LastModified:%s, Size:%d\n", index, part.ETag,
  24. part.PartNumber, part.LastModified, part.Size)
  25. }
  26. } else if obsError, ok := err.(obs.ObsError); ok {
  27. fmt.Printf("Code:%s\n", obsError.Code)
  28. fmt.Printf("Message:%s\n", obsError.Message)
  29. }
  30. }

分段上传 - 图5 说明:

  • 列举段至多返回1000个段信息,如果指定的Upload ID包含的段数量大于1000,则返回结果中ListPartsOutput.IsTruncated为true表明本次没有返回全部段,并可通过ListPartsOutput.NextPartNumberMarker获取下次列举的起始位置。
  • 如果想获取指定Upload ID包含的所有分段,可以采用分页列举的方式。
  • 列举所有段
    由于ObsClient.ListParts只能列举至多1000个段,如果段数量大于1000,列举所有分段请参考如下示例:
  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.ListPartsInput{}
  16. input.Bucket = "bucketname"
  17. input.Key = "objectkey"
  18. input.UploadId = "uploadid"
  19.  
  20. for {
  21. output, err := obsClient.ListParts(input)
  22. if err == nil {
  23. for index, part := range output.Parts {
  24. fmt.Printf("Part[%d]-ETag:%s, PartNumber:%d, LastModified:%s, Size:%d\n", index, part.ETag,
  25. part.PartNumber, part.LastModified, part.Size)
  26. }
  27. if output.IsTruncated {
  28. input.PartNumberMarker = output.NextPartNumberMarker
  29. } else {
  30. break
  31. }
  32. } else {
  33. if obsError, ok := err.(obs.ObsError); ok {
  34. fmt.Printf("Code:%s\n", obsError.Code)
  35. fmt.Printf("Message:%s\n", obsError.Message)
  36. }
  37. break
  38. }
  39. }
  40. }

列举分段上传任务

您可以通过ObsClient.ListMultipartUploads列举分段上传任务。列举分段上传任务可设置的参数如下:


参数

作用

ListMultipartUploadsInput.Bucket

桶名。

ListMultipartUploadsInput.Prefix

限定返回的分段上传任务中的对象名必须带有Prefix前缀。

ListMultipartUploadsInput.MaxUploads

列举分段上传任务的最大数目,取值范围为1~1000,当超出范围时,按照默认的1000进行处理。

ListMultipartUploadsInput.Delimiter

用于对分段上传任务中的对象名进行分组的字符。对于对象名中包含Delimiter的任务,其对象名(如果请求中指定了Prefix,则此处的对象名需要去掉Prefix)中从首字符至第一个Delimiter之间的字符串将作为一个分组并作为CommonPrefix返回。

ListMultipartUploadsInput.KeyMarker

表示列举时返回指定的KeyMarker之后的分段上传任务。

ListMultipartUploadsInput.UploadIdMarker

只有与KeyMarker参数一起使用时才有意义,用于指定返回结果的起始位置,即列举时返回指定KeyMarker的UploadIdMarker之后的分段上传任务。
  • 简单列举分段上传任务
  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.ListMultipartUploadsInput{}
  16. input.Bucket = "bucketname"
  17. output, err := obsClient.ListMultipartUploads(input)
  18. if err == nil {
  19. for index, upload := range output.Uploads {
  20. fmt.Printf("Upload[%d]-OwnerId:%s, UploadId:%s, Key:%s, Initiated:%s,StorageClass:%s\n",
  21. index, upload.Owner.ID, upload.UploadId, upload.Key, upload.Initiated, upload.StorageClass)
  22. }
  23. } else if obsError, ok := err.(obs.ObsError); ok {
  24. fmt.Printf("Code:%s\n", obsError.Code)
  25. fmt.Printf("Message:%s\n", obsError.Message)
  26. }
  27. }

分段上传 - 图6 说明:

  • 列举分段上传任务至多返回1000个任务信息,如果指定的桶包含的分段上传任务数量大于1000,则返回结果中ListMultipartUploadsOutput.IsTruncated为true表明本次没有返回全部结果,并可通过ListMultipartUploadsOutput.NextKeyMarker和ListMultipartUploadsOutput.NextUploadIdMarker获取下次列举的起点。
  • 如果想获取指定桶包含的所有分段上传任务,可以采用分页列举的方式。
  • 列举全部分段上传任务
  1. // 引入依赖包
  2. import (
  3. "fmt"
  4. "obs"
  5. )
  6.  
  7. var ak = "*** Provide your Access Key ***"
  8. var sk = "*** Provide your Secret Key ***"
  9. var endpoint = "https://your-endpoint"
  10.  
  11. // 创建ObsClient结构体
  12. var obsClient, _ = obs.New(ak, sk, endpoint)
  13.  
  14. func main() {
  15. input := &obs.ListMultipartUploadsInput{}
  16. input.Bucket = "bucketname"
  17. output, err := obsClient.ListMultipartUploads(input)
  18. for {
  19. if err == nil {
  20. for index, upload := range output.Uploads {
  21. fmt.Printf("Upload[%d]-OwnerId:%s, UploadId:%s, Key:%s, Initiated:%s,StorageClass:%s\n",
  22. index, upload.Owner.ID, upload.UploadId, upload.Key, upload.Initiated, upload.StorageClass)
  23. }
  24. if output.IsTruncated {
  25. input.KeyMarker = output.NextKeyMarker
  26. input.UploadIdMarker = output.NextUploadIdMarker
  27. } else {
  28. break
  29. }
  30. } else {
  31. if obsError, ok := err.(obs.ObsError); ok {
  32. fmt.Printf("Code:%s\n", obsError.Code)
  33. fmt.Printf("Message:%s\n", obsError.Message)
  34. }
  35. break
  36. }
  37. }
  38. }

父主题:上传对象