[Python] 纯文本查看 复制代码
import requests
import re
from urllib.parse import urlparse, urljoin
import json
def get_lanzou_direct_link(share_url):
"""获取蓝奏云文件直链的完整流程"""
# 提取基础域名
parsed = urlparse(share_url)
base_domain = f"{parsed.scheme}://{parsed.netloc}"
# 设置通用请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36',
'Referer': base_domain + '/',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
session = requests.Session()
# 步骤1: 访问初始分享链接,获取二级链接
try:
print(f"步骤1: 访问分享链接 [{share_url}]")
response1 = session.get(share_url, headers=headers, timeout=10)
response1.raise_for_status()
# 从HTML中提取二级链接
iframe_match = re.search(r'<iframe.*?src="(/fn\?[^"]+)"', response1.text)
if not iframe_match:
print("❌ 未找到iframe链接,请检查分享链接格式")
return None
# 构建完整的二级链接
secondary_url = urljoin(base_domain, iframe_match.group(1))
print(f"✅ 发现二级链接: {secondary_url}")
except Exception as e:
print(f"❌ 访问分享链接失败: {str(e)}")
return None
# 步骤2: 访问二级链接,提取关键参数
try:
print(f"步骤2: 访问二级链接 [{secondary_url}]")
response2 = session.get(secondary_url, headers=headers, timeout=10)
response2.raise_for_status()
# 提取关键参数
wp_sign_match = re.search(r"var\s+wp_sign\s*=\s*'([^']+)';", response2.text)
ajaxdata_match = re.search(r"var\s+ajaxdata\s*=\s*'([^']+)';", response2.text)
ajax_url_match = re.search(r"url\s*:\s*'([^']+)'\s*,\s*//data///////", response2.text)
if not (wp_sign_match and ajaxdata_match and ajax_url_match):
print("❌ 未找到必要参数 (wp_sign, ajaxdata 或 ajax_url)")
return None
wp_sign = wp_sign_match.group(1)
ajaxdata = ajaxdata_match.group(1)
ajax_url_path = ajax_url_match.group(1)
# 构建完整的ajax_url
ajax_url = urljoin(base_domain, ajax_url_path)
print(f"✅ 提取参数: wp_sign={wp_sign[:20]}... ajaxdata={ajaxdata}")
print(f"✅ 构建AJAX请求URL: {ajax_url}")
except Exception as e:
print(f"❌ 访问二级链接失败: {str(e)}")
return None
# 步骤3: 发送AJAX POST请求
try:
print(f"步骤3: 发送AJAX请求到 [{ajax_url}]")
post_data = {
'action': 'downprocess',
'websignkey': ajaxdata,
'signs': ajaxdata,
'sign': wp_sign,
'websign': '',
'kd': 1,
'ves': 1
}
# 添加AJAX请求头
ajax_headers = headers.copy()
ajax_headers.update({
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Origin': base_domain,
'X-Requested-With': 'XMLHttpRequest'
})
response3 = session.post(ajax_url, data=post_data, headers=ajax_headers, timeout=10)
response3.raise_for_status()
# 解析JSON响应
ajax_data = response3.json()
print(f"✅ AJAX响应: {json.dumps(ajax_data, ensure_ascii=False)}")
if ajax_data.get('zt') != 1:
print(f"❌ AJAX响应状态错误: {ajax_data.get('inf')}")
return None
# 提取第四级链接
if ajax_data.get('dom') and ajax_data.get('url'):
# 确保dom以//或http开头
domain = ajax_data['dom']
if not (domain.startswith('//') or domain.startswith('http')):
domain = '//' + domain
fourth_url = domain +'/file/'+ ajax_data['url']
# 确保是有效URL
if not fourth_url.startswith('http'):
fourth_url = 'https:' + fourth_url if fourth_url.startswith('//') else f'https://{fourth_url}'
print(f"✅ 构建第四级链接: {fourth_url}")
else:
print("❌ AJAX响应缺少dom或url字段")
return None
except Exception as e:
print(f"❌ AJAX请求失败: {str(e)}")
return None
# 步骤4: 获取最终直链
try:
print(f"步骤4: 访问第四级链接 [{fourth_url}] (禁止重定向)")
# 设置Referer为二级链接
location_headers = headers.copy()
location_headers['Referer'] = secondary_url
response4 = session.get(fourth_url, headers=location_headers,
allow_redirects=False, timeout=10)
print(f"✅ 重定向响应,Location头: {response4.headers.get('Location')}")
if response4.status_code in [301, 302, 303, 307, 308]:
location = response4.headers.get('Location')
if location:
print(f"✅ 发现最终直链: {location}")
return location
else:
print("❌ 重定向响应但没有Location头")
return None
elif response4.status_code == 200:
# 检查是否直接返回文件内容
content_type = response4.headers.get('Content-Type', '')
if 'application/' in content_type or 'octet-stream' in content_type:
print("✅ 第四级链接就是最终直链")
return fourth_url
else:
print(f"❌ 返回200但内容类型异常: {content_type}")
return None
else:
print(f"❌ 意外的状态码: {response4.status_code}")
return None
except Exception as e:
print(f"❌ 访问第四级链接失败: {str(e)}")
return None
"""使用最终直链下载文件"""
if not direct_url:
return None
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate'
}
# 提取文件名
if not file_name:
if '?fn=' in direct_url:
file_name = direct_url.split('?fn=')[1].split('&')[0]
else:
file_name = direct_url.split('/')[-1].split('?')[0]
print(f"开始下载: {file_name}")
response = requests.get(direct_url, headers=headers, stream=True)
if response.status_code != 200:
print(f"下载失败,状态码: {response.status_code}")
return None
# 获取文件大小
content_length = int(response.headers.get('Content-Length', 0))
# 下载并显示进度
with open(file_name, 'wb') as f:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 过滤掉保持活动状态的新块
f.write(chunk)
downloaded += len(chunk)
if content_length > 0:
percent = (downloaded / content_length) * 100
print(f"\r下载进度: {percent:.1f}% ({downloaded}/{content_length} bytes)", end='', flush=True)
print(f"\n✅ 文件下载成功: {file_name}")
return file_name
except Exception as e:
print(f"❌ 下载文件失败: {str(e)}")
return None
if __name__ == "__main__":
# 用户输入分享链接
share_url = input("请输入蓝奏云分享链接: ").strip()
if not share_url.startswith("http"):
share_url = "https://" + share_url
print("="*50)
print(f"开始处理: {share_url}")
print("="*50)
# 获取直链
direct_link = get_lanzou_direct_link(share_url)
if direct_link:
print("\n" + "="*50)
print(f"✅ 成功获取最终直链!")
print(f"直链地址: {direct_link}")
else:
print("\n" + "="*50)
print("❌ 获取直链失败,请检查分享链接")