python


13、爬取mooc视频、资料等

<h3>1、简单说明</h3> <p>main\download\unixtime 三个模块 放在一个文件夹下,下载bs urlib等必要模块,运行main.py</p> <h3>main.py</h3> <pre><code>import requests # HTTP import re # 正则表达式 import test # 网页爬虫调试 import unixtime # Unix时间戳 import download # 文件下载 from urllib.parse import quote # URL编码解码 from prettytable import PrettyTable, FRAME, NONE # 表格输出 import time import random # 搜索页面相关的全局变量 pageIndex = 0 # 页码 totlePageCount = 0 # 总页数 curPageCount = 0 # 当前页的课程数 totleCount = 0 # 搜索到的总课程数 courseIndex = 0 # 课程编号 # 请求头 headers = {'Host': 'www.icourse163.org', 'Connection': 'keep-alive', 'Origin': 'https://www.icourse163.org', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36', 'Content-Type': 'text/plain'} # 搜索课程,爬取搜索结果页面 def search_course(keyword, pageIndex=1): url = 'https://www.icourse163.org/dwr/call/plaincall/MocSearchBean.searchMocCourse.dwr' status = 30 pageSize = 20 data = {'callCount': '1', 'scriptSessionId': '${scriptSessionId}190', 'httpSessionId': 'bd4f183dd74746aa83b2cced56a0795b', 'c0-scriptName': 'MocSearchBean', 'c0-methodName': 'searchMocCourse', 'c0-id': '0', 'c0-e1': 'string:' + quote(keyword), 'c0-e2': 'number:{}'.format(pageIndex), 'c0-e3': 'boolean:true', 'c0-e4': 'null:null', 'c0-e5': 'number:0', 'c0-e6': 'number:{}'.format(status), # 0-已结束; 10-正在进行; 20-即将开始; 30-所有课程 'c0-e7': 'number:{}'.format(pageSize), 'c0-param0': 'Object_Object:{keyword:reference:c0-e1,pageIndex:reference:c0-e2,highlight:reference:c0-e3,categoryId:reference:c0-e4,orderBy:reference:c0-e5,stats:reference:c0-e6,pageSize:reference:c0-e7}', 'batchId': '1528898317310'} # test.printDICT(data) # 测试所用 try: r = requests.post(url, headers=headers, data=data) r.raise_for_status() # test.detect_encoding(r) # 检测到响应的编码时'ascii' page = r.text.encode('utf-8').decode('unicode_escape') # 解码为 unicode_escape 便于print将汉字打印输出 # print(page[3000:4000]) # 测试所用 # test.outputHTML(page, '搜索页面第 ' + str(pageIndex) + ' 页') return page except requests.HTTPError as ex: print('课程搜索页面访问出错...\n[-]ERROR: %s' % str(ex)) raise # 解析搜索结果的页面 def parse_search(page): # 页面信息解析 global pageIndex, totleCount, totlePageCount, curPageCount re_pageInfo = r'pageIndex=(\d+);.*totleCount=(\d+);.*totlePageCount=(\d+);' list_pageInfo = re.findall(re_pageInfo, page) # 得到一个多维列表形式的匹配结果 pageIndex = int(list_pageInfo[0][0]) totleCount = int(list_pageInfo[0][1]) totlePageCount = int(list_pageInfo[0][2]) # 课程信息解析 # 0 - cid(无用); 1 - 课程名; 2 - 授课教师; 3 - 院校; 4 - tid,termId page = re.sub(r'({##)|(##})', '', page) # 删除page中的#{}符号 re_courseInfo = r'cid=(\d+);.*highlightName="(.+)";.*highlightTeacherNames="(.+)";.*highlightUniversity="(.+)";' \ r'.+\W{0,4}.+currentTermId=(\d+);' list_courseInfo = re.findall(re_courseInfo, page) # 课程状态解析 # 0 - 结束时间; 1 - 参加人数; 2 - 介绍 3 - 开始时间; re_courseStat = r'endTime=(\d+);.*?enrollCount=(\d+);.*?jsonContent="(.+[\s\S]{0,120}.+)";.*startTime=(\d+);' list_courseStat = re.findall(re_courseStat, page) curPageCount = len(list_courseInfo) return list_courseInfo, list_courseStat # 输出搜索到的页面信息与课程列表 def disp_search(courseInfo): print('&gt;&gt;&gt; 搜索到 %d 条相关结果\t\t当前第 %d 页\t\t共 %d 页\t\t向上翻页:[u]\t\t向下翻页:[d]\t\t重新搜索:[e]' % (totleCount, pageIndex, totlePageCount)) table = PrettyTable([' ', "编号", "课程名( 输入序号查看详情 )", "授课教师", "开设院校"]) table.align = 'l' table.vrules = FRAME table.vertical_char = ' ' table.align["授课教师"] = 'c' table.align["开设院校"] = 'c' for i, item in zip(range(curPageCount), courseInfo): table.add_row([' ', i, item[1][:20], item[2][:11], item[3][:10]]) print(table) # print('当前页%d条' % curPageCount) # 交互 def user_interface(courseInfo, courseStat): global pageIndex, courseIndex while True: cmd = input('&gt;&gt;&gt; 请输入命令(课程编号/u/d/e):').strip() if re.match(r'^\d\d?\W*$', cmd): courseIndex = eval(cmd) if 0 &lt;= courseIndex &lt;= curPageCount: try: courseName = courseInfo[courseIndex][1].strip() courseAbstract = courseStat[courseIndex][2] startTime = unixtime.number2time(courseStat[courseIndex][3], 13) endTime = unixtime.number2time(courseStat[courseIndex][0], 13) enrollCount = courseStat[courseIndex][1] print('&gt;&gt;&gt;', courseName, ':', courseAbstract, '\n') print('\t开课时间:%s\n\t结束时间:%s\n\t已参加人数:%s\n' % (startTime, endTime, enrollCount)) except: print('&gt;&gt;&gt; 该课程暂未开通,请访问以下链接进行确认!') print('&gt;&gt;&gt; https://www.icourse163.org/search.htm?search={}#/'.format(quote(courseName))) main() cmd = input('&gt;&gt;&gt; 按下Enter键选定该课程,退出选定请按其他键: ') if re.match(r'^.*?\n*$', cmd, re.I): cmd = input('&gt;&gt;&gt; 下载视频(v)还是课件(p)?退出下载请按其他键: ') # 3代表文档,1代表视频 if re.match(r'^v\W*$', cmd, re.I): print('&gt;&gt;&gt; 正在进行视频资源分析,请稍后...') return 1 elif re.match(r'^p\W*$', cmd, re.I): print('&gt;&gt;&gt; 正在进行课件资源分析,请稍后...') return 3 else: print('&gt;&gt;&gt; 已退出下载!') else: print('&gt;&gt;&gt; 已退出选定!') else: print('&gt;&gt;&gt; 课程编号输入错误,请重新输入!') elif re.match(r'^e\W*$', cmd, re.I): main() elif re.match(r'^u\W*$', cmd, re.I): if pageIndex &lt;= 1: print('&gt;&gt;&gt; 已经是首页了...') else: pageIndex -= 1 break elif re.match(r'^d\W*$', cmd, re.I): if pageIndex &gt;= totlePageCount: print('&gt;&gt;&gt; 已经是尾页了...') else: pageIndex += 1 break else: print('&gt;&gt;&gt; 命令错误,请重新输入!') return False # 刷新搜索页面 # 获取资源页列表 def get_source_list(tid): # 2018.6 抓取的数据包没有 getMocTermDto.dwr,怀疑是MOOC改版了。只能通过查getLastLearnedMocTermDto.dwr得到资源列表 # 但是这就需要加入Cookie了,还要保持对话什么的...不是很方便,就暂时还用网络大神的老版本吧 # url = 'https://www.icourse163.org/dwr/call/plaincall/CourseBean.getLastLearnedMocTermDto.dwr' # POST请求,视频链接在r的js中 url = 'http://www.icourse163.org/dwr/call/plaincall/CourseBean.getMocTermDto.dwr' # 网络大神的旧链接 data = {'callCount': '1', 'scriptSessionId': '${scriptSessionId}190', 'c0-scriptName': 'CourseBean', 'c0-methodName': 'getMocTermDto', 'c0-id': 0, 'c0-param0': 'number:' + tid, # tid,termId 'c0-param1': 'number:1', 'c0-param2': 'boolean:true', 'batchId': unixtime.now()} try: r = requests.post(url, headers=headers, data=data) r.raise_for_status() # test.detect_encoding(r) # 检测到响应的编码时'ascii' page = r.text.encode('utf-8').decode('unicode_escape') # 解码为 unicode_escape 便于print将汉字打印输出 # print(page[3000:4000]) # 测试所用 # test.outputHTML(page, '获取资源列表') return page except requests.HTTPError as ex: print('&gt;&gt;&gt; 课程搜索页面访问出错...\n[-]ERROR: %s' % str(ex)) raise # 解析资源页列表,得到下载请求的data包参数:contentId,contentType(传入),文件id,文件name def parse_source(page, sourceType): # 3代表文档,1代表视频 ch = '段视频' if sourceType is 1 else '份课件' # 0 - cid; 1 - id; 2 - name re_sourceList = r'anchorQuestions=.*contentId=(\d*);.*contentType={};.*id=(\d*);.*name="(.*)";'.format( sourceType) sourceList = re.findall(re_sourceList, page) if not sourceList: print('&gt;&gt;&gt; Source List is Empty!') else: print('&gt;&gt;&gt; 本课程共有', len(sourceList), ch, end=',') return sourceList # 解析下载链接: 1代表视频, 3代表文档 def get_download_info(dataList, sourceType, Quality=None, fileFormat=None): url = 'http://www.icourse163.org/dwr/call/plaincall/CourseBean.getLessonUnitLearnVo.dwr' content_id = dataList[0] file_id = dataList[1] file_name = re.sub(r'[/\\*|&lt;&gt;:?"]', '', dataList[2]) # 移除Windows文件名非法字符 data = {'callCount': '1', 'scriptSessionId': '${scriptSessionId}190', 'c0-scriptName': 'CourseBean', 'c0-methodName': 'getLessonUnitLearnVo', 'c0-id': '0', 'c0-param0': 'number:' + content_id, # contentId 'c0-param1': 'number:{}'.format(sourceType), 'c0-param2': 'number:0', 'c0-param3': 'number:' + file_id, # 文件id 'batchId': unixtime.now()} try: r = requests.post(url, headers=headers, data=data) r.raise_for_status() page = r.text # test.outputHTML(page,'下载链接') except requests.HTTPError as ex: print('课程搜索页面访问出错...\n[-]ERROR: %s' % str(ex)) raise if Quality: # 进行视频文件的解析 re_videoLink = r'{}{}Url="(.+?)";'.format(fileFormat, Quality) video_url = re.findall(re_videoLink, page) re_srtLink = r's\d+\.name="([\w\\]+?)";s\d+\.url="(.+?)";' srt_url = re.findall(re_srtLink, page) if video_url: if srt_url: return [video_url[0], srt_url[0][1]], file_name else: return [video_url[0]], file_name else: return [], file_name else: # 进行课件文件的解析 re_PDFLink = r'http://nos.netease.com/.*?\.pdf' pdf_url = re.findall(re_PDFLink, page) if pdf_url: return [pdf_url[0]], file_name else: return [], file_name # 批量下载所有资源: 1代表视频, 3代表文档 def mass_dowmload(sourceList, sourceType): download_cnt = 0 direction = download.select_direction() if sourceType is 1: # 视频下载 qualityList = ['Hd', 'Sd', 'Shd', 'Hd', 'Sd', 'Shd'] formatList = ['flv', 'flv', 'flv', 'mp4', 'mp4', 'mp4'] while True: index = '4' #input('&gt;&gt;&gt; 请选择视频格式:\n\t0-FLV高清,1-FLV标清,2-FLV超清\n\t3-MP4高清,4-MP4标清,5-MP4超清\n&gt;&gt;&gt; ') if re.match(r'\d', index): index = int(index) # 将字符串数字转为数值 if 0 &lt;= index &lt;= 5: quality = qualityList[index] fileFormat = formatList[index] break else: print('&gt;&gt;&gt; 选择错误!') else: quality = None fileFormat = 'pdf' skipNum = input('&gt;&gt;&gt; 是否继续下载?若是,请输入跳过文件数,否则请按Enter键:') if re.match(r'\d', skipNum): skipNum = int(skipNum) else: skipNum = 0 for item in sourceList: time.sleep(random.randint(1, 3)) download_cnt += 1 if download_cnt &lt;= skipNum: continue (url, name) = get_download_info(item, sourceType, quality, fileFormat) print(url) print('\n[+]正在下载第{}份 - {}.{} ...\n'.format(download_cnt, name, fileFormat)) if len(url) is 1: download.download(url[0], direction, name, fileFormat,download_cnt) # 课件或课件 elif len(url) is 2: download.download(url[0], direction, str(download_cnt)+'、'+name, fileFormat,download_cnt) # 视频 download.download(url[1], direction, str(download_cnt)+'、'+name, 'srt',download_cnt) # 字幕 'smallfile' else: print('[-]ERROR:URL is None') print('\n&gt;&gt;&gt; 下载完成!本次总计下载资源{}份'.format(download_cnt)) # 主函数 def main(): keyword = input('&gt;&gt;&gt; 请输入关键字搜索课程:') #如 'Python语言程序设计' # keyword = test.inputString('机器学习') global pageIndex pageIndex = 1 while True: searchPage = search_course(keyword, pageIndex) (courseInfo, courseStat) = parse_search(searchPage) disp_search(courseInfo) download_type = user_interface(courseInfo, courseStat) if download_type: sourcePage = get_source_list(courseInfo[courseIndex][4]) sourceList = parse_source(sourcePage, download_type) if sourceList: cmd = input('请按下Enter键确认下载,取消请按其他键: ') if re.match(r'^.*\n*$', cmd, re.I): mass_dowmload(sourceList, download_type) else: print('&gt;&gt;&gt; 已退出下载!') break if __name__ == '__main__': main() </code></pre> <h4>download.py</h4> <pre><code>import os import requests from urllib import error # 下载路径选择 def select_direction(): currentDir = os.getcwd() while True: path = "./download/"#input('&gt;&gt;&gt; 请输入保存路径:(默认在当前目录-{}下创建)\n&gt;&gt;&gt; '.format(currentDir)) # 获得当前文件夹 if not path: print('&gt;&gt;&gt; 非法路径!') continue if not os.path.isdir(path): # 检测是否是文件夹 os.mkdir(path) # 在当前目录下创建文件夹,path = 相对路径 break return path # 下载文件 def download(url, direction, fileName, fileType,download_cnt): print(url) global mode if fileType == 'srt': url =url mode = 2 elif fileType == 'pdf': url = url mode = 2 else: # 如果是视频,替换下载链接的前缀 mode = 1 #url1 = 'https://jdvodrvfb210d.vod.126.net/jdvodrvfb210d/nos' url = url #1 + url[35:] print(url) # 文件的绝对路径,如 D:\Program Files\Python36\python.exe abs_fileName = '{}/{}.{}'.format(direction, fileName, fileType) renameCount = 0 while True: # 检查是否重名 if os.path.exists(abs_fileName): abs_fileName = '{}/{}-{}.{}'.format(direction, str(download_cnt)+'、'+fileName, renameCount, fileType) else: break # 小文件模式:直接下载 #mode = 1 if mode == 2: try: r = requests.get(url) r.raise_for_status() with open(abs_fileName, 'wb') as file: file.write(r.content) except requests.HTTPError as ex: print('[-]ERROR: %s' % ex) except KeyboardInterrupt: os.remove(abs_fileName) raise return # 大文件模式:分块下载 try: r = requests.get(url, stream=True) r.raise_for_status() if 'Content-Length' not in r.headers: raise requests.HTTPError('No Content Length') file_size = int(r.headers['Content-Length']) # 文件大小:B if file_size &lt; 10 * 1024 * 1024: chunk_size = 1024 * 1024 # 分块大小 B else: chunk_size = 3 * 1024 * 1024 download_size = 0 # 已下载大小:B with open(abs_fileName, 'wb') as file: for chunk in r.iter_content(chunk_size=chunk_size): progress = download_size / file_size * 100 # 下载进度 prompt_bar = '[{:50}] {:.1f}%\tSize: {:.2f}MB'.format( '=' * int(progress / 2), progress, download_size / 1024 / 1024) print(prompt_bar, end='\r') # \r 代表打印头归位,回到某一行的开头 file.write(chunk) download_size += chunk_size print(str(download_cnt)+abs_fileName+' - ******************') #os.rename(abs_fileName, str(download_cnt)+abs_fileName) print('[{:50}] 100% Done!\tSize: {:.2f}MB'.format('=' * 50, file_size / 1024 / 1024)) except error.HTTPError as ex: print('[-]ERROR: %s' % ex) print("--------------") except KeyboardInterrupt: os.remove(path) raise </code></pre> <h4>unixtime.py</h4> <pre><code>import time # Unix时间戳 # Unix时间戳解码,如string ='1534348800000', number_bits = 13, 毫秒级, 返回值 = '2018/8/16 0:0:0' def number2time(string, number_bits=13): timeFormat = '%Y-%m-%d %H:%M:%S' if number_bits is 10: # 秒级 value = eval(string) elif number_bits is 13: # 毫秒级 value = eval(string) / 1000 localTime = time.localtime(value) date = time.strftime(timeFormat, localTime) return date # 获取当前的Unix时间戳,如获取秒级, number_bits = 10, 返回值 = 1534348800 def now(number_bits=13): second = time.time() if number_bits is 10: # 秒级 return round(second) elif number_bits is 13: # 毫秒级 millisecond = second * 1000 return round(millisecond) else: return None </code></pre>

页面列表

ITEM_HTML