Uploading large files in parts to S3 with asyncio

Uploading large files in parts to S3 with asyncio

In this lesson I'll show you how to upload large files using aiobotocore.
Aiobotocore is async version of boto3 library. I didn't find any example of using async python to upload large files. So I hope this post will help you.


Multipart upload to S3 using asyncio

Running S3 compatible object storage in docker

Docker is a tool which allows you easly package and run different software on machine using OS containerization technology. You can download it here
DOCKER

First of all we should run S3 storage somewhere. You can use hosted AWS S3 storage but in case of testing you can use free minio image from docker hub.
Minio is an object storage which has S3 compatible API and can be used via AWS S3 API libraries.

To run minio on your local machine using docker copy and paste the following command:

docker run \
  -p 9000:9000 \
  -p 9001:9001 \
  -e "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" \
  -e "MINIO_ROOT_PASSWORD=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" \
  quay.io/minio/minio server /data --console-address ":9001"
Screenshot-2019-12-18-at-20.08.21
Running minio using docker

After that you need to create a bucket and obtain access key along with access secret key. To do that open your browser, navigate to 127.0.0.1:9000 to see the login page.

  1. Use provided environment variables AKIAIOSFODNN7EXAMPLE  as login and wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY as password to sign in.
  2. Click on the Buckets button inside left menu and create a bucket. In this case we named it as test-bucket.
  3. Then click on Users button, create a user with FAKE_KEY access key and FAKE_KEY access secret key. Grant all needed policies(permissions) to the created user. For instance, we will need readwrite

Check out the video below if you want some visuals on how to setup minio.

What is multipart upload

0:00
/

Multipart upload is S3 API which allows to upload a file in several parts.

Let's say we have a social network service and within the service an image gallery exists for a particular user.
We want an ability to download an archive with all photos.
How can we do that?  

  1. Gather all user photos inside the image  gallery
  2. Create an archive
  3. Upload the archive  into S3

We have a couple of problems here:

  1. According to the  S3 documentation you can't upload files large than 5GB to the S3 in single PUT operation.
  2. Bad network may cause issues preventing from uploading large archive

Solution - Multipart upload

Multipart upload is designed to solve issues with uploading large files with size from 5mb to 5TB.  The AWS Docs recommends consider using it when file size > 100mb.
So, in our case it's the best solution for uploading an archive of gathered photos since the size of the archive may be > 100mb.

Multipart upload steps

In order to to use multipart upload you have to follow several steps:

  1. Send create multipart upload request and receive info about UploadId
# Step 1 - create multipart upload
# Important args:
# 	Bucket     - bucket name where file will be stored;
#	Key        - path in S3 under which the file will be available
#	Expires    - when uploaded parts will be deleted if multipart upload fails;
# Important response attributes:
#	UploadId   - identifier of the multipart upload


create_multipart_upload_resp = client.create_multipart_upload(
    Bucket=AWS_S3_BUCKET_NAME,
    Key=key,
	Expires=datetime.now() + timedelta(days=1), 
)

upload_id: str = create_multipart_upload_resp['UploadId'] 
Step 1 - create multipart upload

2. Split a file into several chunks, then upload each chunk providing part number, upload id, data, etc. Each chunk info must be recorded somewhere. It will be used to complete multipart upload

# Step 2 - upload part
# Important args:
# 	Bucket     - bucket name where file will be stored;
#	Body 	   - chunk data in bytes;
#	UploadId   - id of the multipart upload which identifies uploading file;
#	PartNumber - number which identifies a chunk of data;
#	Key        - path in S3 under which the file will be available;
# Important response attributes:
#	ETag 	   - identifier of the file representing its version;


# Used to complete multipart upload
part_info = {
	'Parts': []
}
part_number = 1

response = client.upload_part(
    Bucket=AWS_S3_BUCKET_NAME,
    Body=chunk,
    UploadId=upload_id,
    PartNumber=part_number,
    Key=key
)

part_info['Parts'].append(
    {
        'PartNumber': part_number,
        'ETag': response['ETag']
    }
)
Step 2 - upload a chunk of data

3. Send list parts request with upload id and receive info about uploaded parts including their count

# Step 3 - list parts actually stored in s3
# Important args:
# 	Bucket     - bucket name where file will be stored;
#	UploadId   - id of the multipart upload which identifies uploading file;
#	Key        - path in S3 under which the file will be available;
# Important response attributes:
#	Parts 	   - uploaded parts registered in S3;


list_parts_response = client.list_parts(
    Bucket=AWS_S3_BUCKET_NAME,
    Key=key,
    UploadId=upload_id
)
chunks_saved_in_s3: int = len(list_parts_response['Parts'])
Step 3 - list parts actually stored in s3

4. Compare number of uploaded parts inside S3 with your local parts info.
If number of parts in total equals count of parts inside S3 then complete multipart upload, else – abort.

# Step 4 - complete or abort multipart upload
# Important args:
# 	Bucket          - bucket name where file will be stored;
#	UploadId        - id of the multipart upload which identifies uploading file;
#	Key             - path in S3 under which the file will be available;
#   MultipartUpload - dict with parts recorded info(part number and etag);
# Important response attributes:
#	Location 	    - path where file is stored, might be used to save in db;

chunks_count_expected = 30 # We are going to calculate this using file size
chunks_count_saved_in_s3: int = len(list_parts_response['Parts'])

# You have to sort parts in ascending order. 
# Otherwise api will throw an error
parts_sorted = sorted(part_info['Parts'], key=lambda k: k['PartNumber'])
part_info['Parts'] = parts_sorted

# If quantity of chunks in S3 is the same as we expected then complete,
# otherwise - abort.
ready_to_complete = chunks_saved_in_s3 == chunks_count_expected

if ready_to_complete:
    print('Done uploading file')
    client.complete_multipart_upload(
        Bucket=AWS_S3_BUCKET_NAME,
        Key=key,
        UploadId=upload_id,
        MultipartUpload=part_info
    )
else:
    print('Aborted uploading file.')
    client.abort_multipart_upload(
        Bucket=AWS_S3_BUCKET_NAME,
        Key=key,
        UploadId=upload_id
    )
Step 4 - complete or abort multipart upload
Not aborted or completed multipart upload costs money

When you upload chunks of data using multipart upload and don't complete/abort the upload, it leads to several issues. Stored parts before completion are hidden inside S3. Which means they occupy some space. If you fail to handle finishing multipart upload in right way, they are going to cost you extra money. That's why expiration attribute can be utilized within create multipart upload API call.

Uploaded parts in S3 are hidden till expiration or complete/abort API call

Splitting File into chunks

How do we divide a file into chunks? Let's recall what does a file object represents.
File holds bytes data. Let's just imagine an array of bytes. This is the file.
Reading from file can be handled using a pointer to byte number from which we can start reading the file. The start pointer is usually handled via seek operation. For example, seek(0) moves pointer inside a file to the zero byte, whereas seek(6) moves pointer to the sixth byte.

Starting from there we can read next N bytes using read operation. For instance, we want to read and save first 5mb of a file. Let's calculate how much this is in bytes. 1024 bytes = 1kb, 1024kb = 1Mb, so, we need 1(byte) * 1024 = 1kb, 1(kbyte) * 1024 = 1mb, 1(mb) * 5 = 5mb. 1 * 1024 * 1024 * 5 =  5242880 bytes or 5242880 bytes to read from the file to retrieve first chunk.
You can do that by operation file.read(5242880)

reading first 5mb of file into chunk

To save first chunk of data we need to move file pointer to the beginning of 1 chunk and call read operation to get first 5mb. Seek offset here equals 0

To read next chunk of data we must move our pointer to the beginning of the second chunk. Seek offset  here equals 1 * BYTES_PER_CHUNK

Number of chunks

To calculate number of chunks we must firstly get size of the file we are reading. To do that you can use os.stat('large.txt').st_size. Next we can divide this number by the bytes per chunk value number_of_chunks = source_size / BYTES_PER_CHUNK. In python the result will be float. Let's use math.ceil and extract the result as integer. chunks_count = int(math.ceil(source_size / BYTES_PER_CHUNK)).

Async upload

In order to speed up our upload we are going to utilize the power of asyncio. This means we will split our uploading into several tasks – asyncio coroutines.

A coroutine is a  single unit of execution running inside a single thread. Its benefit is  using only a single thread which means less OS resources needed and less headache with synchronization compared to multithreaded programming. Each task will be executed untill blocking IO operation appears. This situation is marked using await word. After sending initial request the coroutine will get back into awaiting list. The event loop will pick up another ready to go coroutine.

Install required libraries:

pip install aiobotocore aiofiles
By historical reasons different OS systems don't support full async API . That's why asyncio doesn't provide this functionality in standard library. The package aiofiles uses thread pool under the hood which allows to do such operations. To utilize aiobotocore you can use boto3 API calls and then add "await" before each of the calls.
ASYNCIO and FILES

We are ready to write async upload. Note that minimum part size must be equal to 5 mb!

import asyncio
import math
import os
from datetime import datetime, timedelta

import aiobotocore
import aiofiles

AWS_S3_HOST = 'http://localhost:9000'
AWS_SECRET_ACCESS_KEY = 'FAKE_KEY'
AWS_ACCESS_KEY_ID = 'FAKE_KEY'
AWS_MULTIPART_BYTES_PER_CHUNK = 1024 * 1024 * 5  # ~ 5mb
AWS_S3_BUCKET_NAME = 'test-bucket'
EXPIRATION_DAYS = 1

# We have to keep info about uploaded parts.
# https://github.com/boto/boto3/issues/50#issuecomment-72079954
part_info = {
    'Parts': []
}
# File object is distributed across coroutines.
# Let's make sure it doesn't make troubles as a shared resource
file_shared_lock = asyncio.Lock()


async def upload_chunk(
        client,
        file,
        upload_id,
        chunk_number,
        bytes_per_chunk,
        source_size,
        key
):
    offset = chunk_number * bytes_per_chunk
    remaining_bytes = source_size - offset
    bytes_to_read = min([bytes_per_chunk, remaining_bytes])
    part_number = chunk_number + 1

    async with file_shared_lock:
        # At this moment the execution returns back to the event loop
        # Another coroutine might make another .seek operation
        # which leads to reading another chunk.
        # That's why we need a lock(asyncio lock) here
        await file.seek(offset)
        chunk = await file.read(bytes_to_read)

    resp = await client.upload_part(
        Bucket=AWS_S3_BUCKET_NAME,
        Body=chunk,
        UploadId=upload_id,
        PartNumber=part_number,
        Key=key
    )

    global part_info
    part_info['Parts'].append(
        {
            'PartNumber': part_number,
            'ETag': resp['ETag']
        }
    )


async def begin_multipart_upload(
        source_path,
        s3_destination_folder_path,
        host=AWS_S3_HOST,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        bytes_per_chunk=AWS_MULTIPART_BYTES_PER_CHUNK
):
    filename = os.path.basename(source_path)
    key = '{}/{}'.format(s3_destination_folder_path, filename)

    session = aiobotocore.get_session()
    async with session.create_client(
            's3', endpoint_url=host,
            aws_secret_access_key=aws_secret_access_key,
            aws_access_key_id=aws_access_key_id
    ) as client:

        source_size = os.stat(source_path).st_size
        chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
        print('chunks_count: ', chunks_count)

        create_multipart_upload_resp = await client.create_multipart_upload(
            Bucket=AWS_S3_BUCKET_NAME,
            Key=key,
            Expires=datetime.now() + timedelta(days=EXPIRATION_DAYS),
        )

        upload_id = create_multipart_upload_resp['UploadId']

        tasks = []
        async with aiofiles.open(source_path, mode='rb') as file:
            for chunk_number in range(chunks_count):
                tasks.append(
                    upload_chunk(
                        client=client,
                        file=file,
                        chunk_number=chunk_number,
                        bytes_per_chunk=bytes_per_chunk,
                        key=key, upload_id=upload_id,
                        source_size=source_size
                    )
                )

            await asyncio.gather(*tasks)

        list_parts_resp = await client.list_parts(
            Bucket=AWS_S3_BUCKET_NAME,
            Key=key,
            UploadId=upload_id
        )

        # You have to sort parts in ascending order. 
        # Otherwise api will throw an error
        part_list = sorted(part_info['Parts'], key=lambda k: k['PartNumber'])
        part_info['Parts'] = part_list
		
        is_finished = len(list_parts_resp['Parts']) == chunks_count
        
        if is_finished:
            print('Done uploading file')
            await client.complete_multipart_upload(
                Bucket=AWS_S3_BUCKET_NAME,
                Key=key,
                UploadId=upload_id,
                MultipartUpload=part_info
            )

        else:
            print('Aborted uploading file.')
            await client.abort_multipart_upload(
                Bucket=AWS_S3_BUCKET_NAME,
                Key=key,
                UploadId=upload_id
            )


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(
        begin_multipart_upload(
            source_path='./large.txt',
            s3_destination_folder_path='large',
        )
    )
s

Generate 5 gb file.

On linux

dd if=/dev/urandom of=large.txt bs=2048 count=2500000

On mac

mkfile 5g large.txt


Does asyncio give a speed up boost?

According to my manual tests it has a little speed advantage over a single PUT operation. Just a little. Adjusting chunk size may affect the total time and  is a good reason to play around with some values. Honestly I don't know if this is really good solution to use. Report any bugs  or optimizations you find.

Possible issues and further optimizations

The  provided code is just an example and requires some enhancements from your side for your particular situation. You need to adjust chunk size as you need. Also be aware that a lot of  such coroutines may cause  too much open file descriptors error. In this case you can implement pool of courotines via queue.  
You can also optimize RAM usage. This is not covered in the article.

Big thanks to authors who shared solutions related to multipart upload.

PS: Also thanks to Zvika Ferentz who contacted me, pointed to bug in the code and shared his solution

You can leave your comments here: https://gist.github.com/skonik/b20a21fc39f97e16c979c49267d90e05