AWS Developer Tools Blog

AWS SDK for Go – Batch Operations with Amazon S3

The v1.9.44 release of the AWS SDK for Go adds support for batched operations in the s3manager package. This enables you to easily upload, download, and delete Amazon S3 objects. The feature uses the iterator, also known as scanner pattern, to enable users to extend the functionality of batching. This blog post shows how to use and extend the new batched operations to fit a given use case.

Deleting objects using ListObjectsIterator

  sess := session.Must(session.NewSession(&aws.Config{}))
  svc := s3.New(sess)

  input := &s3.ListObjectsInput{
    Bucket:  aws.String("bucket"),
    MaxKeys: aws.Int64(100),
  }
  // Create a delete list objects iterator
  iter := s3manager.NewDeleteListIterator(svc, input)
  // Create the BatchDelete client
  batcher := s3manager.NewBatchDeleteWithClient(svc)

  if err := batcher.Delete(aws.BackgroundContext(), iter); err != nil {
    panic(err)
  }

This example lists all objects, one hundred at a time, under the bucket passed in the command line arguments. The example above creates a new delete list iterator and dictates how the BatchDelete client behaves. This means that when we call Delete on the client it will require a BatchDeleteIterator.

Creating a custom iterator

The SDK enables you to pass custom iterators to the new batched operations. For example, if we want to upload a directory, none of the default iterators do this easily. The following example shows how to implement a custom iterator that uploads a directory to S3.

 // DirectoryIterator iterates through files and directories to be uploaded                                          
// to S3.                                                                                                               
type DirectoryIterator struct {                                                                                         
  filePaths []string                                                                                                    
  bucket    string                                                                                                      
  next      struct {                                                                                                    
    path string                                                                                                         
    f    *os.File                                                                                                       
  }                                                                                                                     
  err error                                                                                                             
}                                                                                                                       
                                                                                                                        
// NewDirectoryIterator creates and returns a new BatchUploadIterator                                                
func NewDirectoryIterator(bucket, dir string) s3manager.BatchUploadIterator {                                           
  paths := []string{}                                                                                                   
  filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {                                             
    // We care only about files, not directories                                                                     
    if !info.IsDir() {                                                                                                  
      paths = append(paths, path)                                                                                       
    }                                                                                                                   
    return nil                                                                                                          
  })                                                                                                                    
                                                                                                                        
  return &DirectoryIterator{                                                                                            
    filePaths: paths,                                                                                                   
    bucket:    bucket,                                                                                                  
  }                                                                                                                     
}                                                                                                                       
                                                                                                                        
// Next opens the next file and stops iteration if it fails to open                                             
// a file.                                                                                                              
func (iter *DirectoryIterator) Next() bool {                                                                            
  if len(iter.filePaths) == 0 {                                                                                         
    iter.next.f = nil                                                                                                   
    return false                                                                                                        
  }                                                                                                                     
                                                                                                                        
  f, err := os.Open(iter.filePaths[0])                                                                                  
  iter.err = err                                                                                                        
                                                                                                                        
  iter.next.f = f                                                                                                       
  iter.next.path = iter.filePaths[0]                                                                                    
                                                                                                                        
  iter.filePaths = iter.filePaths[1:]                                                                                   
  return true && iter.Err() == nil                                                                                      
}                                                                                                                       
                                                                                                                        
// Err returns an error that was set during opening the file
func (iter *DirectoryIterator) Err() error {                                                                            
  return iter.err                                                                                                       
}                                                                                                                       
                                                                                                                        
// UploadObject returns a BatchUploadObject and sets the After field to                                              
// close the file.                                                                                                      
func (iter *DirectoryIterator) UploadObject() s3manager.BatchUploadObject {                                             
  f := iter.next.f                                                                                                      
  return s3manager.BatchUploadObject{                                                                                   
    Object: &s3manager.UploadInput{                                                                                     
      Bucket: &iter.bucket,                                                                                             
      Key:    &iter.next.path,                                                                                          
      Body:   f,                                                                                                        
    },
	// After was introduced in version 1.10.7
    After: func() error {                                                                                               
      return f.Close()                                                                                                  
    },                                                                                                                  
  }                                                                                                                     
}

We have defined a new iterator named DirectoryIterator. This satisfies the BatchUploadIterator by defining the three necessary methods of Next, Err, and UploadObject. The Next method on the iterator will let the batch operation know to continue the iteration or not. Err returns an error if there was one. In this case, the only time we will return an error is when we fail to open a file. If this occurs, the Next method will return false. Finally, the UploadObject returns the BatchUploadObject that is used to upload contents to the service. In this example, we see that we create an input object and a closure. This closure ensures that we’re not leaking files. Now let’s define our main function using what we defined above.

func main() {
  region := os.Args[1]
  bucket := os.Args[2]
  path := os.Args[3]
  iter := NewDirectoryIterator(bucket, path)                                                                  
  uploader := s3manager.NewUploader(session.New(&aws.Config{                                                            
    Region: &region,                                                                                    
  }))                                                                                                                   
                                                                                                                        
  if err := uploader.UploadWithIterator(aws.BackgroundContext(), iter); err != nil {                                    
    panic(err)                                                                                                          
  }                                                                                                                     
  fmt.Printf("Successfully uploaded %q to %q", path, bucket)                                                                                                
}  

You can verify that the directory has been uploaded by looking in S3.

Please chat with us on gitter and file feature requests or issues in github. We look forward to your feedback and recommendations!