AWS Developer Blog

Parallelizing Large Uploads for Speed and Reliability

by Magnus Bjorkman | on | in Java | | Comments

As Big Data grows in popularity, it becomes more important to move large data sets to and from Amazon S3. You can improve the speed of uploads by parallelizing them. You can break an individual file into multiple parts and upload those parts in parallel by setting the following in the AWS SDK for Java:

TransferManager tx = new TransferManager(
    			new AmazonS3Client(new DefaultAWSCredentialsProviderChain()),
    			Executors.newFixedThreadPool(5));
    	
TransferManagerConfiguration config=new TransferManagerConfiguration();
config.setMultipartUploadThreshold(5*1024*1024);
config.setMinimumUploadPartSize(5*1024*1024);
tx.setConfiguration(config);

There are a few things to note:

  • When we create the TransferManager, we give it an execution pool of 5 threads. By default, the TransferManager creates a pool of 10, but you can set this to scale the pool size.
  • The TransferManagerConfiguration allows us to set the limits used by the AWS SDK for Java to break large files into smaller parts.
  • MultipartUploadThreshold defines the size at which the AWS SDK for Java should start breaking apart the files (in this case, 5 MiB).
  • MinimumUploadPartSize defines the minimum size of each part. It must be at least 5 MiB; otherwise, you will get an error when you try to upload it.

In addition to improved upload speeds, an advantage to doing this is that your uploads will become more reliable, because if you have a failure, it will occur on a small part of the upload, rather than the entire upload. The retry logic built into the AWS SDK for Java will try to upload the part again. You can control the retry policy when you create the S3 client.

ClientConfiguration clientConfiguration=new ClientConfiguration();
clientConfiguration.setRetryPolicy(
    			PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(5));
TransferManager tx = new TransferManager(
    			new AmazonS3Client(new DefaultAWSCredentialsProviderChain(),clientConfiguration),
    			Executors.newFixedThreadPool(5));

Here we change the default setting of 3 retry attempts to 5. You can implement your own back-off strategies and define your own retry-able failures.

Although this is useful for a single file, especially a large one, people often have large collections of files. If those files are close in size to the multipart threshold, you need to submit multiple files to the TransferManager at the same time to get the benefits of parallelization. This requires a little more effort but is straightforward. First, we define a list of uploads.

objectList.add(new PutObjectRequest("mybucket", "folder/myfile1.dat",
	    			new File("/localfolder/myfile1.dat));
objectList.add(new PutObjectRequest("mybucket", "folder/myfile2.dat",
	    			new File("/localfolder/myfile2.dat));

Then we can submit the files for upload:

CountDownLatch doneSignal = new CountDownLatch(objectList.size());
ArrayList uploads = new ArrayList();
for (PutObjectRequest object: objectList) {
	object.setGeneralProgressListener(new UploadCompleteListener(object.getFile(),object.getBucketName()+"/"+object.getKey(),doneSignal));
	uploads.add(tx.upload(object));
}
try {
	doneSignal.await();
} catch ( InterruptedException e ) {
        throw new UploadException("Couldn't wait for all uploads to be finished");
}

The upload command is simple: just call the upload method on the TransferManager. That method is not blocking, so it will just schedule the upload and immediately return. To track progress and figure out when the uploads are complete:

  • We use a CountDownLatch, initializing it to the number of files to upload.
  • We register a general progress listener with each PutObjectRequest, so we can capture major events, including completion and failures that will count down the CountDownLatch.
  • After we have submitted all of the uploads, we use the CountDownLatch to wait for the uploads to complete.

The following is a simple implementation of the progress listener that allows us to track the uploads. It also contains some print statements to allow us to see what is happening when we test this:

class UploadCompleteListener implements ProgressListener
{
	
	private static Log log = LogFactory.getLog(UploadCompleteListener.class);
	
	CountDownLatch doneSignal;
	File f;
	String target;
	
	public UploadCompleteListener(File f,String target,
									CountDownLatch doneSignal) {
		this.f=f;
		this.target=target;
		this.doneSignal=doneSignal;
	}
	
	public void progressChanged(ProgressEvent progressEvent) {
		if (progressEvent.getEventType() 
				== ProgressEventType.TRANSFER_STARTED_EVENT) {
        	log.info("Started to upload: "+f.getAbsolutePath()
        		+ " -> "+this.target);
        }
        if (progressEvent.getEventType()
        		== ProgressEventType.TRANSFER_COMPLETED_EVENT) {
        	log.info("Completed upload: "+f.getAbsolutePath()
        		+ " -> "+this.target);
        	doneSignal.countDown();
        }
        if (progressEvent.getEventType() == 
        		ProgressEventType.TRANSFER_FAILED_EVENT) {
        	log.info("Failed upload: "+f.getAbsolutePath()
        		+ " -> "+this.target);
        	doneSignal.countDown();
        }
    }
}

After you have finished, don’t forget to shut down the transfer manager.

tx.shutdownNow();

Another great option for moving very large data sets is the AWS Import/Export Snowball service, a petabyte-scale data transport solution that uses secure appliances to transfer large amounts of data into and out of the AWS cloud.