Files
dsProject/dsLightRag/Dsideal/YunXiao/Fetch.py
2025-08-14 15:45:08 +08:00

234 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 请先安装必要的库: pip install tqdm requests
import os
import json
import threading
import requests
from tqdm import tqdm
def fetch_lesson_data(page_num=1, page_size=10, search_keyword=""):
"""
发送POST请求获取云课堂课程数据
参数:
page_num: 页码默认为1
page_size: 每页数量默认为10
search_keyword: 搜索关键词,默认为空
返回:
解析后的JSON数据如果请求失败则返回None
"""
# API URL
url = "https://yx.ccsjy.cn/api/cloud-school/v1/cloudLesson/getOnDemandLessonPage"
# 请求参数
payload = {
"pageNum": page_num,
"pageSize": page_size,
"businessBookId": "3029115d4afa424f8160adb04bd10e6a",
"businessEditionId": "19F93E4A7C9B4B589EB001FBFEE6230A",
"excellentFlag": "",
"nodeId": None,
"nodeType": 1,
"stageCode": "3",
"subjectCode": "314",
"sortType": 2,
"source": "",
"searchKeyword": search_keyword
}
try:
# 发送POST请求
response = requests.post(url, json=payload)
# 检查响应状态
if response.status_code == 200:
# 解析JSON响应
data = response.json()
return data
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"请求异常: {str(e)}")
return None
# 创建多线程下载类
class MultiThreadDownloader:
def __init__(self, url, file_path, num_threads=4):
self.url = url
self.file_path = file_path
self.num_threads = num_threads
self.total_size = 0
self.progress = None
def get_file_size(self):
response = requests.head(self.url, allow_redirects=True)
if 'content-length' in response.headers:
self.total_size = int(response.headers['content-length'])
return True
return False
def download_chunk(self, start, end, thread_id):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(self.url, headers=headers, stream=True)
chunk_size = 1024
with open(f'{self.file_path}.part{thread_id}', 'wb') as f:
for data in response.iter_content(chunk_size=chunk_size):
f.write(data)
if self.progress:
self.progress.update(len(data))
def merge_chunks(self):
with open(self.file_path, 'wb') as f:
for i in range(self.num_threads):
part_file = f'{self.file_path}.part{i}'
if os.path.exists(part_file):
with open(part_file, 'rb') as pf:
f.write(pf.read())
os.remove(part_file)
def start(self):
# 确保目录存在
os.makedirs(os.path.dirname(self.file_path), exist_ok=True)
if not self.get_file_size():
print(f"无法获取文件大小,使用单线程下载: {self.url}")
# 回退到单线程下载
try:
response = requests.get(self.url, stream=True)
with open(self.file_path, 'wb') as f:
total_size = int(response.headers.get('content-length', 0))
with tqdm(total=total_size, unit='B', unit_scale=True, desc=self.file_path.split('\\')[-1]) as pbar:
for data in response.iter_content(chunk_size=1024):
f.write(data)
pbar.update(len(data))
return True
except Exception as e:
print(f"单线程下载失败: {str(e)}")
return False
# 多线程下载
chunk_size = self.total_size // self.num_threads
self.progress = tqdm(total=self.total_size, unit='B', unit_scale=True, desc=self.file_path.split('\\')[-1])
threads = []
for i in range(self.num_threads):
start = i * chunk_size
end = self.total_size - 1 if i == self.num_threads - 1 else (i + 1) * chunk_size - 1
thread = threading.Thread(target=self.download_chunk, args=(start, end, i))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
self.progress.close()
self.merge_chunks()
return True
# 测试函数
if __name__ == "__main__":
# 创建下载目录
down_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Down")
os.makedirs(down_dir, exist_ok=True)
# 获取总记录数
first_page = fetch_lesson_data(page_num=1, page_size=1)
if first_page:
total = first_page['data']['total']
page_size = 10 # 每页获取10条
total_pages = (total + page_size - 1) // page_size
print(f"获取成功,共{total}条记录")
# 遍历所有页面
all_lessons = []
for page_num in range(1, total_pages + 1):
print(f"正在获取第{page_num}/{total_pages}页数据...")
result = fetch_lesson_data(page_num=page_num, page_size=page_size)
if result and result['data']['rows']:
all_lessons.extend(result['data']['rows'])
else:
print(f"{page_num}页数据获取失败")
print(f"成功获取{len(all_lessons)}条课程数据")
# 准备保存下载信息
download_info = []
success_count = 0
fail_count = 0
# 遍历所有课程
for idx, lesson in enumerate(all_lessons, 1):
lesson_id = lesson.get('lessonId')
lesson_name = lesson.get('lessonName')
print(f"\n处理课程 {idx}/{len(all_lessons)}: {lesson_name}")
# 提取视频URL
lesson_resources = lesson.get('lessonResources', [])
video_url = None
if lesson_resources:
# 假设第一个资源是视频
video_url ="https://ccschool.edusoa.com/"+ lesson_resources[0].get('fileUrl', None)
safe_name=lesson_resources[0].get('fileUrl', None).split('/')[-1]
if video_url:
# 生成安全的文件名
file_path = os.path.join(down_dir, f"{safe_name}")
# 检查文件是否已存在
if os.path.exists(file_path):
print(f"文件已存在,跳过下载: {file_path}")
download_info.append({
"course_name": lesson_name,
"course_id": lesson_id,
"file_path": file_path,
"status": "已存在"
})
continue
# 开始下载
print(f"开始下载视频: {video_url}")
downloader = MultiThreadDownloader(video_url, file_path)
if downloader.start():
print(f"视频下载成功: {file_path}")
download_info.append({
"course_name": lesson_name,
"course_id": lesson_id,
"file_path": file_path,
"status": "成功"
})
success_count += 1
else:
print(f"视频下载失败: {video_url}")
download_info.append({
"course_name": lesson_name,
"course_id": lesson_id,
"file_path": file_path,
"status": "失败"
})
fail_count += 1
else:
print("未找到视频URL")
download_info.append({
"course_name": lesson_name,
"course_id": lesson_id,
"file_path": "",
"status": "无视频URL"
})
fail_count += 1
# 每5个课程保存一次下载信息
if idx % 5 == 0 or idx == len(all_lessons):
info_file = os.path.join(down_dir, "download_info.json")
with open(info_file, 'w', encoding='utf-8') as f:
json.dump(download_info, f, ensure_ascii=False, indent=2)
print(f"已保存下载信息到: {info_file}")
print(f"\n下载完成!成功: {success_count}, 失败: {fail_count}")
else:
print("获取课程数据失败")