Aiobotocore. Как переименовать 500 тысяч объектов в AWS S3

Aiobotocore. Как переименовать 500 тысяч объектов в AWS S3

Возникла необходимость переименовать несколько сотен тысяч объектов, хранящихся в AWS S3.  Можно нагуглить документацию и примеры на boto3, решив задачу синхронным способом. Такой скрипт работал безостановочно 4 дня, а затем остановился по моей же ошибке. Запускать скрипт снова – слишком долго, да и нет гарантии, что все файлы пройдут обработку. Работа с S3 API это в первую очередь работа с сетью(io bound). Для ускорения работы в таком случае можно использовать следующие модели конкурентности:

  • потоки
  • процессы
  • асинхронность на корутинах

Самый легкий путь – использовать асинхронность в одном потоке. Разбиваем задачу на мелкие асинхронные подзадачи, собираем все подзадачи в одной функции и запускаем event loop.

Для асинхронной работы с S3 можно использовать aiobotocore. Тем не менее библиотека плохо задокументирована и мне пришлось буквально методом тыка узнавать, как ей пользоваться.

В первую очередь создадим главную корутину – точку входа в приложение

import asyncio

import aiobotocore

BUCKET_NAME = 'xx'
AWS_ACCESS_KEY_ID = 'xx'
AWS_SECRET_ACCESS_KEY = 'xx'
AWS_S3_REGION_NAME = 'us-east-1'


async def main():
    session = aiobotocore.get_session()
    async with session.create_client(
            's3',
            region_name=AWS_S3_REGION_NAME,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            aws_access_key_id=AWS_ACCESS_KEY_ID
    ) as s3_client:
        pass


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


В корутине main создаем S3 клиент. При запуске скрипта создаем ивент луп и крутим нашу главную корутину.

Для получения объектов из S3 существует операция list_objects, которая выдает информацию о 1000 объектах. Если же их больше, то придется использовать paginator.

async def main():
    session = aiobotocore.get_session()
    async with session.create_client(
            's3',
            region_name=AWS_S3_REGION_NAME,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            aws_access_key_id=AWS_ACCESS_KEY_ID
    ) as s3_client:
        paginator = s3_client.get_paginator('list_objects')

        async for page in paginator.paginate(
                Bucket=BUCKET_NAME,
                PaginationConfig={
                    'MaxItems': 1000_000_000,
                    'PageSize': 1000
                }):
            objects = page.get("Contents", [])
            objects = list(filter(lambda obj: obj['Key'].endswith('original'), objects))

PaginationConfig это словарь с конфигов для пагинатора. MaxItems ограничивает всю выборку данных, а PageSize устанавливает количество объектов на одной странице. Может показаться, что за одну страницу можно получить больше 1000 объектов, однако у меня это число осталось на уровне тысячи, когда я ставил больше.

В моем случае мне необходимо было переименовать файлы, которые оканчивались на original.

Осталось описать функцию для переименовывания файлов и запустить несколько тысяч таких корутин. Идея была бы хорошая, если бы все работало без проблем. Запустив несколько тысяч корутин мы столкнемся со следующими проблемами:

  • Нагрузка на сервер будет высокой, возможны потери соединения и обрубание коннекта
  • На машине, где запускается скрипт банально закончится количество возможных сетевых дескрипторов(сетевых соединений), которое регулируется операционной системой, и скрипт выдаст ошибку

В этом случае стоит ограничить количество одновременно работающих корутин с помощью использования очереди сообщений и воркеров, которые постоянно читают эту очередь.

import botocore



async def rename_object(s3_client, key, extension='.jpeg'):
    key_with_extension = key + extension
    try:
        await s3_client.head_object(Bucket=BUCKET_NAME, Key=key + extension)
    except botocore.exceptions.s3_clientError:
        copy_source = {
            'Bucket': BUCKET_NAME,
            'Key': key
        }
        await s3_client.copy_object(CopySource=copy_source, Bucket=BUCKET_NAME, Key=key_with_extension)
        await s3_client.delete_object(Bucket=BUCKET_NAME, Key=key)
        
        
async def renaming_worker(s3_client, queue):
    while True:
        try:
        	key = queue.get_nowait()
        except asyncio.QueueEmpty:
        	return
        if key is None:
            break
        await rename_object(s3_client, key)

renaming_worker в бесконечном цикле берет объекты из очереди и проводит операцию переименовывания. Если объекта в очереди нет, то get_nowait райзит ошибку QueueEmpty. Поэтому в главной корутине мы сначала заполним очередь, а затем запустим воркеры и будем ожидать их завершения.   Таким образом мы можем ограничивать количество одновременно работающих корутин и избавиться от проблем, когда их количество создает больше вреда, нежели пользы.

Для индикации прогресса отлично подойдет библиотека tqdm. Сначала соберем все ключи,  а затем запустим воркеры.

MAX_WORKERS_COUNT = 100


async def renaming_worker(s3_client, queue, pbar):
    while True:
        try:
            key = queue.get_nowait()
        except asyncio.QueueEmpty:
            return
        if key is None:
            break
        await rename_object(s3_client, key)
        # Обновляем бар индикации прогресса
        pbar.update(1)


async def main():
    session = aiobotocore.get_session()
    async with session.create_client(
            's3',
            region_name=AWS_S3_REGION_NAME,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            aws_access_key_id=AWS_ACCESS_KEY_ID
    ) as s3_client:
        paginator = s3_client.get_paginator('list_objects')

        queue = asyncio.Queue()
        async for page in paginator.paginate(
                Bucket=BUCKET_NAME,
                PaginationConfig={
                    'MaxItems': 1000_000_000,
                    'PageSize': 1000
                }):
            objects = page.get("Contents", [])
        objects = list(filter(lambda obj: obj['Key'].endswith('original'), objects))
        for obj in objects:
            key = obj['Key']
            await queue.put(key)

    pbar = tqdm(total=queue.qsize())
    tasks = []
    for i in range(MAX_WORKERS_COUNT):
        tasks.append(renaming_worker(s3_client, queue, pbar))

    await asyncio.gather(*tasks)

В главной корутине мы создали объект tqdm, который можно обновлять после переименовывания изображения. В списке задач находятся корутины воркеры. В конце-концов главная корутина ожидает их завершения.

В результате получаем вот такую индикацию процесса работы с объектами. Итоговый скрипт выполнился за 5 часов.

Полный скрипт

import asyncio

import botocore
from tqdm import tqdm

import aiobotocore

MAX_WORKERS_COUNT = 100

BUCKET_NAME = 'xx'
AWS_ACCESS_KEY_ID = 'xx'
AWS_SECRET_ACCESS_KEY = 'xx'
AWS_S3_REGION_NAME = 'us-east-1'


async def rename_object(s3_client, key, extension='.jpeg'):
    key_with_extension = key + extension
    try:
        await s3_client.head_object(Bucket=BUCKET_NAME, Key=key + extension)
    except botocore.exceptions.s3_clientError:
        copy_source = {
            'Bucket': BUCKET_NAME,
            'Key': key
        }
        await s3_client.copy_object(CopySource=copy_source, Bucket=BUCKET_NAME, Key=key_with_extension)
        await s3_client.delete_object(Bucket=BUCKET_NAME, Key=key)


async def renaming_worker(s3_client, queue, pbar):
    while True:
        try:
            key = queue.get_nowait()
        except asyncio.QueueEmpty:
            return
        if key is None:
            break
        await rename_object(s3_client, key)
        # Обновляем бар индикации прогресса
        pbar.update(1)


async def main():
    session = aiobotocore.get_session()
    async with session.create_client('s3',
                                     region_name=AWS_S3_REGION_NAME,
                                     aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                     aws_access_key_id=AWS_ACCESS_KEY_ID
                                     ) as s3_client:
        paginator = s3_client.get_paginator('list_objects')

        queue = asyncio.Queue()
        async for page in paginator.paginate(Bucket=BUCKET_NAME,
                                             PaginationConfig={
                                                 'MaxItems': 1000_000_000,
                                                 'PageSize': 1000
                                             }):
            objects = page.get("Contents", [])
            objects = list(filter(lambda obj: obj['Key'].endswith('original'), objects))
            for obj in objects:
                key = obj['Key']
                await queue.put(key)

    pbar = tqdm(total=queue.qsize())
    tasks = []
    for i in range(MAX_WORKERS_COUNT):
        tasks.append(renaming_worker(s3_client, queue, pbar))

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())