Aiobotocore. Как переименовать 500 тысяч объектов в AWS S3

Возникла необходимость переименовать несколько сотен тысяч объектов, хранящихся в AWS S3. Можно нагуглить документацию и примеры на boto3, решив задачу синхронным способом. Такой скрипт работал безостановочно 4 дня, а затем остановился по моей же ошибке. Запускать скрипт снова – слишком долго, да и нет гарантии, что все файлы пройдут обработку. Работа с S3 API это в первую очередь работа с сетью(io bound). Для ускорения работы в таком случае можно использовать следующие модели конкурентности:
- потоки
- процессы
- асинхронность на корутинах
Самый легкий путь – использовать асинхронность в одном потоке. Разбиваем задачу на мелкие асинхронные подзадачи, собираем все подзадачи в одной функции и запускаем event loop.
Для асинхронной работы с S3 можно использовать aiobotocore. Тем не менее библиотека плохо задокументирована и мне пришлось буквально методом тыка узнавать, как ей пользоваться.
В первую очередь создадим главную корутину – точку входа в приложение
import asyncio
import aiobotocore
BUCKET_NAME = 'xx'
AWS_ACCESS_KEY_ID = 'xx'
AWS_SECRET_ACCESS_KEY = 'xx'
AWS_S3_REGION_NAME = 'us-east-1'
async def main():
session = aiobotocore.get_session()
async with session.create_client(
's3',
region_name=AWS_S3_REGION_NAME,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID
) as s3_client:
pass
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
В корутине main создаем S3 клиент. При запуске скрипта создаем ивент луп и крутим нашу главную корутину.
Для получения объектов из S3 существует операция list_objects, которая выдает информацию о 1000 объектах. Если же их больше, то придется использовать paginator.
async def main():
session = aiobotocore.get_session()
async with session.create_client(
's3',
region_name=AWS_S3_REGION_NAME,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID
) as s3_client:
paginator = s3_client.get_paginator('list_objects')
async for page in paginator.paginate(
Bucket=BUCKET_NAME,
PaginationConfig={
'MaxItems': 1000_000_000,
'PageSize': 1000
}):
objects = page.get("Contents", [])
objects = list(filter(lambda obj: obj['Key'].endswith('original'), objects))
PaginationConfig это словарь с конфигов для пагинатора. MaxItems ограничивает всю выборку данных, а PageSize устанавливает количество объектов на одной странице. Может показаться, что за одну страницу можно получить больше 1000 объектов, однако у меня это число осталось на уровне тысячи, когда я ставил больше.
В моем случае мне необходимо было переименовать файлы, которые оканчивались на original.
Осталось описать функцию для переименовывания файлов и запустить несколько тысяч таких корутин. Идея была бы хорошая, если бы все работало без проблем. Запустив несколько тысяч корутин мы столкнемся со следующими проблемами:
- Нагрузка на сервер будет высокой, возможны потери соединения и обрубание коннекта
- На машине, где запускается скрипт банально закончится количество возможных сетевых дескрипторов(сетевых соединений), которое регулируется операционной системой, и скрипт выдаст ошибку
В этом случае стоит ограничить количество одновременно работающих корутин с помощью использования очереди сообщений и воркеров, которые постоянно читают эту очередь.

import botocore
async def rename_object(s3_client, key, extension='.jpeg'):
key_with_extension = key + extension
try:
await s3_client.head_object(Bucket=BUCKET_NAME, Key=key + extension)
except botocore.exceptions.s3_clientError:
copy_source = {
'Bucket': BUCKET_NAME,
'Key': key
}
await s3_client.copy_object(CopySource=copy_source, Bucket=BUCKET_NAME, Key=key_with_extension)
await s3_client.delete_object(Bucket=BUCKET_NAME, Key=key)
async def renaming_worker(s3_client, queue):
while True:
try:
key = queue.get_nowait()
except asyncio.QueueEmpty:
return
if key is None:
break
await rename_object(s3_client, key)
renaming_worker в бесконечном цикле берет объекты из очереди и проводит операцию переименовывания. Если объекта в очереди нет, то get_nowait райзит ошибку QueueEmpty. Поэтому в главной корутине мы сначала заполним очередь, а затем запустим воркеры и будем ожидать их завершения. Таким образом мы можем ограничивать количество одновременно работающих корутин и избавиться от проблем, когда их количество создает больше вреда, нежели пользы.
Для индикации прогресса отлично подойдет библиотека tqdm. Сначала соберем все ключи, а затем запустим воркеры.
MAX_WORKERS_COUNT = 100
async def renaming_worker(s3_client, queue, pbar):
while True:
try:
key = queue.get_nowait()
except asyncio.QueueEmpty:
return
if key is None:
break
await rename_object(s3_client, key)
# Обновляем бар индикации прогресса
pbar.update(1)
async def main():
session = aiobotocore.get_session()
async with session.create_client(
's3',
region_name=AWS_S3_REGION_NAME,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID
) as s3_client:
paginator = s3_client.get_paginator('list_objects')
queue = asyncio.Queue()
async for page in paginator.paginate(
Bucket=BUCKET_NAME,
PaginationConfig={
'MaxItems': 1000_000_000,
'PageSize': 1000
}):
objects = page.get("Contents", [])
objects = list(filter(lambda obj: obj['Key'].endswith('original'), objects))
for obj in objects:
key = obj['Key']
await queue.put(key)
pbar = tqdm(total=queue.qsize())
tasks = []
for i in range(MAX_WORKERS_COUNT):
tasks.append(renaming_worker(s3_client, queue, pbar))
await asyncio.gather(*tasks)
В главной корутине мы создали объект tqdm, который можно обновлять после переименовывания изображения. В списке задач находятся корутины воркеры. В конце-концов главная корутина ожидает их завершения.
В результате получаем вот такую индикацию процесса работы с объектами. Итоговый скрипт выполнился за 5 часов.

Полный скрипт
import asyncio
import botocore
from tqdm import tqdm
import aiobotocore
MAX_WORKERS_COUNT = 100
BUCKET_NAME = 'xx'
AWS_ACCESS_KEY_ID = 'xx'
AWS_SECRET_ACCESS_KEY = 'xx'
AWS_S3_REGION_NAME = 'us-east-1'
async def rename_object(s3_client, key, extension='.jpeg'):
key_with_extension = key + extension
try:
await s3_client.head_object(Bucket=BUCKET_NAME, Key=key + extension)
except botocore.exceptions.s3_clientError:
copy_source = {
'Bucket': BUCKET_NAME,
'Key': key
}
await s3_client.copy_object(CopySource=copy_source, Bucket=BUCKET_NAME, Key=key_with_extension)
await s3_client.delete_object(Bucket=BUCKET_NAME, Key=key)
async def renaming_worker(s3_client, queue, pbar):
while True:
try:
key = queue.get_nowait()
except asyncio.QueueEmpty:
return
if key is None:
break
await rename_object(s3_client, key)
# Обновляем бар индикации прогресса
pbar.update(1)
async def main():
session = aiobotocore.get_session()
async with session.create_client('s3',
region_name=AWS_S3_REGION_NAME,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID
) as s3_client:
paginator = s3_client.get_paginator('list_objects')
queue = asyncio.Queue()
async for page in paginator.paginate(Bucket=BUCKET_NAME,
PaginationConfig={
'MaxItems': 1000_000_000,
'PageSize': 1000
}):
objects = page.get("Contents", [])
objects = list(filter(lambda obj: obj['Key'].endswith('original'), objects))
for obj in objects:
key = obj['Key']
await queue.put(key)
pbar = tqdm(total=queue.qsize())
tasks = []
for i in range(MAX_WORKERS_COUNT):
tasks.append(renaming_worker(s3_client, queue, pbar))
await asyncio.gather(*tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())