近期文章:
生产项目,跨项目迁移4w压缩包,要求下载后删除压缩包内的 html,txt,url结尾的文件,并上传至 oss,但下载前需要对文件列表中的地址做切割,最后和域名做拼接去下载,删除压缩包内文件使用的 shell命令zip -d,在此记录下,上传的时候有要使用文件列表中的地址上传,但是 oss上传第一个/要去掉,使用lstrip(‘/’)去掉后上传就不报错了,
自学的 python,水平有限,总感觉写的不太理想,但我想要的功能实现了就行,后面慢慢改进吧
Python3 异步并发下载脚本
import aiohttp import asyncio import os import zipfile import subprocess import oss2 import logging #指定文件列表文件 file_list="zip.txt" #指定数据保存目录 DATA_DIR = "/data/" BASE_URL = "下载域名" #指定最大并发数 MAX_CONCURRENT_DOWNLOADS = 8 #上传成功后是否删除本地文件 DELETE_LOCAL_FILE_AFTER_UPLOAD = True #False #间隔几秒后重试和最大重试次数 RETRY_DELAY = 5 MAX_RETRIES = 3 #指定下载超时时间 TIMEOUT = 300 #配置下载成功与失败记录文件 SUCCESS_FILE = os.path.join(DATA_DIR, "success_log.txt") FAILURE_FILE = os.path.join(DATA_DIR, "failure_log.txt") # 配置oss OSS_ACCESS_KEY_ID = "" OSS_ACCESS_KEY_SECRET = "" OSS_ENDPOINT = "" OSS_BUCKET_NAME = "" #配置日志文件 logging.basicConfig(filename='download_upload.log', level=logging.INFO) async def extract_content_from_url(url): #url做切割 parts = url.split('/', 4) first_part = '/' + '/'.join(parts[1:4]) content = parts[4] return first_part, content async def download_from_ku(url, session, semaphore, success_file, failure_file, retries=MAX_RETRIES): first_part, content = await extract_content_from_url(url) if not content: return #构建完整的下载地址 full_url = BASE_URL + content data_dir = DATA_DIR + os.path.dirname(content) os.makedirs(data_dir, exist_ok=True) local_filename = os.path.join(data_dir, os.path.basename(content)) if os.path.exists(local_filename): logging.info(f"文件已存在: {local_filename}") return local_filename while retries > 0: try: async with semaphore, session.get(full_url, timeout=TIMEOUT) as response: with open(local_filename, 'wb') as file: while True: chunk = await response.content.read(8192) if not chunk: break file.write(chunk) print(f"下载成功: {first_part}/{content}") logging.info(f"下载成功: {first_part}/{content}") with open(SUCCESS_FILE, 'a') as success_log: success_log.write(f"下载成功: {first_part}/{content}\n") return local_filename except asyncio.TimeoutError: logging.warning(f"下载超时,跳过下载 {first_part}/{content}") with open(FAILURE_FILE, 'a') as failure_log: failure_log.write(f"下载超时: {first_part}/{content}\n") return None except Exception as e: print(f"下载出错 {content}: {e}") logging.error(f"下载出错 {content}: {e}") with open(FAILURE_FILE, 'a') as failure_log: failure_log.write(f"下载出错 {content}: {e}\n") retries -= 1 if retries == 0: logging.error(f"达到最大重试次数,放弃下载 {content}") else: logging.info(f"重试下载 {content}, 剩余次数: {retries}") await asyncio.sleep(RETRY_DELAY) async def remove_files_from_zip(zip_file, *files_to_remove): #删除压缩包内指定类型文件,调用 zip -d data_dir = DATA_DIR + os.path.dirname(zip_file) zip_file_path = os.path.join(data_dir, zip_file) command = ["zip", "-d", zip_file_path] + list(files_to_remove) try: subprocess.run(command) logging.info(f"Removed files from: {zip_file_path}") except subprocess.CalledProcessError as e: logging.error(f"Error removing files from {zip_file_path}: {e}") async def upload_to_oss(local_path, oss_key, retries=MAX_RETRIES): # 将文件上传到 OSS auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET) bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME) while retries > 0: try: with open(local_path, 'rb') as file: bucket.put_object(oss_key, file) oss_url = f"https://{OSS_BUCKET_NAME}.{OSS_ENDPOINT}/{oss_key}" print(f"上传成功: {oss_url}") logging.info(f"上传成功: {oss_url}") if DELETE_LOCAL_FILE_AFTER_UPLOAD: os.remove(local_path) break except oss2.exceptions.OssError as e: print(f"上传失败: {e}") logging.error(f"上传失败: {e}") retries -= 1 if retries == 0: logging.error(f"达到最大重试次数,放弃上传 {local_path}") else: logging.info(f"重试上传 {local_path}, 剩余次数: {retries}") await asyncio.sleep(RETRY_DELAY) async def download_remove_upload_task(url, session, semaphore): first_part, content = await extract_content_from_url(url) if not content: return try: local_filename = await download_from_ku(url, session, semaphore, SUCCESS_FILE, FAILURE_FILE) if local_filename: await remove_files_from_zip(local_filename, "*.html", "*.txt", "*.url") first_part, content = await extract_content_from_url(url) oss_key = (first_part.lstrip('/') + '/' + os.path.dirname(content)+ "/" + os.path.basename(local_filename) ) await upload_to_oss(local_filename, oss_key) except Exception as e: print(f"处理 {url} 时出错: {e}") logging.error(f"处理 {url} 时出错: {e}") async def main(): with open(file_list, "r") as file: semaphore = asyncio.Semaphore(MAX_CONCURRENT_DOWNLOADS) connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_DOWNLOADS) async with aiohttp.ClientSession(connector=connector) as session: #创建下载任务列表 tasks = [ download_remove_upload_task(url.strip().strip('"'), session, semaphore) for url in file ] #并发执行下载移除上传任务 await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())
pip3 安装依赖 aiohttp asyncio oss2 subprocess
pip3 install aiohttp pip3 install asyncio pip3 install oss2 pip3 install subprocess
python3 脚本内容简单备注
前面第四个/前面的要去掉才是正确的下载地址,使用 split切割,将第一、二部分路径分别保存到一个变量里
async def extract_content_from_url(url): #url做切割 parts = url.split('/', 4) first_part = '/' + '/'.join(parts[1:4]) content = parts[4] return first_part, content
构建下载删除上传任务
最后在主函数中使用asyncio.gather(*tasks)启动任务,丢到后台跑了一晚上,有个别大文件下载超时的,重新跑了一下失败的就结束了