import sys; sys.dont_write_bytecode = True import os from pathlib import Path import time import logging import argparse import requests from common import * API_PART_COMMENT_THREADS = ['snippet','replies'] API_PART_COMMENTS = ['snippet','id'] API_PARAMS_COMMENT_THREADS = { 'part': ','.join(API_PART_COMMENT_THREADS), 'maxResults': 100, 'order': 'time'} API_PARAMS_COMMENTS = { 'part': ','.join(API_PART_COMMENTS), 'maxResults': 100} ## Parse arguments def __parse_args(argv): parser = argparse.ArgumentParser( description='Write comments metadata for videos.', formatter_class=argparse.RawTextHelpFormatter) parser._action_groups.pop() required = parser.add_argument_group('required arguments') optional = parser.add_argument_group('optional arguments') required.add_argument( '-i','--input',required=True, help='List of videoIds (https://www.youtube.com/watch?v={videoId})', metavar='videos.txt') optional.add_argument( '-d','--api_dir', type=Path, default=Path(__file__).absolute().parent / 'videos', help='Output directory for API files. Default is ./videos.', metavar='./playlists') args = parser.parse_args(argv) f_in = args.input api_dir = args.api_dir if not f_in.endswith('.txt'): raise ValueError('Input filename not a .txt file.') return {'input':f_in,'api_dir':api_dir} def main(args): ## Create error logger err_log = make_error_logger('error.log') ## Parse arguments args = __parse_args(args) f_in = args['input'] api_dir = args['api_dir'] ## Get Ids from input file ids_in = get_ids(f_in,'videoId') ## Create output directory if not os.path.exists(api_dir): os.makedirs(api_dir) ## Iterate over Ids session = requests.Session() total_ids = len(ids_in) ids_out = [] for i,_id in enumerate(ids_in): print('\n########################\n') print(f'Processing Id {_id} ({i+1}/{total_ids})...') ## Get API info session.params = API_PARAMS_COMMENT_THREADS session.params['videoId'] = _id session.params.pop('pageToken', None) ## Reset pageToken ret = get_api_items('commentThreads',session) (js,session,err_msg) = (ret['json'],ret['session'],ret['error']) if err_msg: err_log.error(f'{_id}: {err_msg}') time.sleep(1) continue print('Found {} comment thread(s).'.format(len(js))) if len(js) > 0: ## Get API info (replies) session.params = API_PARAMS_COMMENTS total_comments = len(js) for thread in js: try: ## Check if need to get more replies replies = thread['replies']['comments'] total_replies = thread['snippet']['totalReplyCount'] if len(replies) == total_replies: total_comments += len(replies) continue session.params['parentId'] = thread['id'] session.params.pop('pageToken', None) ## Reset pageToken ret = get_api_items('comments',session) (js_reply,session,err_msg) = ( ret['json'],ret['session'],ret['error']) if err_msg: err_log.error('{}: {}'.format(thread['id'],err_msg)) time.sleep(1) continue thread['replies']['comments'] = js_reply total_comments += len(js_reply) except: pass print(f'Found {total_comments} comment(s).') ## Write API info to file f_api = os.path.join(api_dir, f'{_id}_comments.json') print(f'Writing API info to {f_api}...') write_api(f_api, js) print('Write successful.') print('\nFinished!') return if __name__ == '__main__': main(sys.argv[1:])