Python3 异步并发下载删除zip压缩包内指定文件后上传压缩包到阿里云 oss

近期文章:

生产项目,跨项目迁移4w压缩包,要求下载后删除压缩包内的 html,txt,url结尾的文件,并上传至 oss,但下载前需要对文件列表中的地址做切割,最后和域名做拼接去下载,删除压缩包内文件使用的 shell命令zip -d,在此记录下,上传的时候有要使用文件列表中的地址上传,但是 oss上传第一个/要去掉,使用lstrip(‘/’)去掉后上传就不报错了,

自学的 python,水平有限,总感觉写的不太理想,但我想要的功能实现了就行,后面慢慢改进吧

Python3 异步并发下载脚本

import aiohttp import asyncio import os import zipfile import subprocess import oss2 import logging  #指定文件列表文件 file_list="zip.txt"  #指定数据保存目录 DATA_DIR = "/data/" BASE_URL = "下载域名"  #指定最大并发数 MAX_CONCURRENT_DOWNLOADS = 8  #上传成功后是否删除本地文件 DELETE_LOCAL_FILE_AFTER_UPLOAD = True #False  #间隔几秒后重试和最大重试次数 RETRY_DELAY = 5 MAX_RETRIES = 3  #指定下载超时时间 TIMEOUT = 300    #配置下载成功与失败记录文件 SUCCESS_FILE = os.path.join(DATA_DIR, "success_log.txt") FAILURE_FILE = os.path.join(DATA_DIR, "failure_log.txt")  # 配置oss OSS_ACCESS_KEY_ID = "" OSS_ACCESS_KEY_SECRET = "" OSS_ENDPOINT = "" OSS_BUCKET_NAME = ""  #配置日志文件 logging.basicConfig(filename='download_upload.log', level=logging.INFO)   async def extract_content_from_url(url):     #url做切割     parts = url.split('/', 4)     first_part = '/' + '/'.join(parts[1:4])     content = parts[4]     return first_part, content   async def download_from_ku(url, session, semaphore, success_file, failure_file, retries=MAX_RETRIES):     first_part, content = await extract_content_from_url(url)     if not content:         return          #构建完整的下载地址     full_url = BASE_URL + content     data_dir = DATA_DIR + os.path.dirname(content)     os.makedirs(data_dir, exist_ok=True)     local_filename = os.path.join(data_dir, os.path.basename(content))      if os.path.exists(local_filename):         logging.info(f"文件已存在: {local_filename}")         return local_filename      while retries > 0:         try:             async with semaphore, session.get(full_url, timeout=TIMEOUT) as response:                 with open(local_filename, 'wb') as file:                     while True:                         chunk = await response.content.read(8192)                         if not chunk:                             break                         file.write(chunk)              print(f"下载成功: {first_part}/{content}")             logging.info(f"下载成功: {first_part}/{content}")             with open(SUCCESS_FILE, 'a') as success_log:                 success_log.write(f"下载成功: {first_part}/{content}\n")             return local_filename          except asyncio.TimeoutError:             logging.warning(f"下载超时,跳过下载 {first_part}/{content}")             with open(FAILURE_FILE, 'a') as failure_log:                 failure_log.write(f"下载超时: {first_part}/{content}\n")             return None          except Exception as e:             print(f"下载出错 {content}: {e}")             logging.error(f"下载出错 {content}: {e}")             with open(FAILURE_FILE, 'a') as failure_log:                 failure_log.write(f"下载出错 {content}: {e}\n")             retries -= 1             if retries == 0:                 logging.error(f"达到最大重试次数,放弃下载 {content}")             else:                 logging.info(f"重试下载 {content}, 剩余次数: {retries}")                 await asyncio.sleep(RETRY_DELAY)   async def remove_files_from_zip(zip_file, *files_to_remove):     #删除压缩包内指定类型文件,调用 zip -d     data_dir = DATA_DIR + os.path.dirname(zip_file)     zip_file_path = os.path.join(data_dir, zip_file)     command = ["zip", "-d", zip_file_path] + list(files_to_remove)     try:         subprocess.run(command)         logging.info(f"Removed files from: {zip_file_path}")     except subprocess.CalledProcessError as e:         logging.error(f"Error removing files from {zip_file_path}: {e}")   async def upload_to_oss(local_path, oss_key, retries=MAX_RETRIES):     # 将文件上传到 OSS     auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)     bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)      while retries > 0:         try:             with open(local_path, 'rb') as file:                 bucket.put_object(oss_key, file)              oss_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_key}"             print(f"上传成功: {oss_url}")             logging.info(f"上传成功: {oss_url}")             if DELETE_LOCAL_FILE_AFTER_UPLOAD:                 os.remove(local_path)             break          except oss2.exceptions.OssError as e:             print(f"上传失败: {e}")             logging.error(f"上传失败: {e}")             retries -= 1             if retries == 0:                 logging.error(f"达到最大重试次数,放弃上传 {local_path}")             else:                 logging.info(f"重试上传 {local_path}, 剩余次数: {retries}")                 await asyncio.sleep(RETRY_DELAY)   async def download_remove_upload_task(url, session, semaphore):     first_part, content = await extract_content_from_url(url)     if not content:         return      try:         local_filename = await download_from_ku(url, session, semaphore, SUCCESS_FILE, FAILURE_FILE)         if local_filename:             await remove_files_from_zip(local_filename, "*.html", "*.txt", "*.url")             first_part, content = await extract_content_from_url(url)             oss_key = (first_part.lstrip('/') + '/' + os.path.dirname(content)+ "/" + os.path.basename(local_filename)             )             await upload_to_oss(local_filename, oss_key)     except Exception as e:         print(f"处理 {url} 时出错: {e}")         logging.error(f"处理 {url} 时出错: {e}")   async def main():     with open(file_list, "r") as file:         semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS)         connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_DOWNLOADS)         async with aiohttp.ClientSession(connector=connector) as session:             #创建下载任务列表             tasks = [                 download_remove_upload_task(url.strip().strip('"'), session, semaphore)                 for url in file             ]             #并发执行下载移除上传任务             await asyncio.gather(*tasks)   if __name__ == "__main__":     asyncio.get_event_loop().run_until_complete(main())

pip3 安装依赖 aiohttp asyncio oss2 subprocess

pip3 install aiohttp  pip3 install asyncio  pip3 install oss2  pip3 install subprocess

python3 脚本内容简单备注

前面第四个/前面的要去掉才是正确的下载地址,使用 split切割,将第一、二部分路径分别保存到一个变量里

async def extract_content_from_url(url):     #url做切割     parts = url.split('/', 4)     first_part = '/' + '/'.join(parts[1:4])     content = parts[4]     return first_part, content
文件列表
download日志

构建下载删除上传任务

最后在主函数中使用asyncio.gather(*tasks)启动任务,丢到后台跑了一晚上,有个别大文件下载超时的,重新跑了一下失败的就结束了