Skip to content

Instantly share code, notes, and snippets.

@mediaczar
Created November 12, 2018 21:38
Show Gist options
  • Select an option

  • Save mediaczar/b104a057e2096f8ff8b0f81b9fe460ce to your computer and use it in GitHub Desktop.

Select an option

Save mediaczar/b104a057e2096f8ff8b0f81b9fe460ce to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import fbtools as fb
import json
fb.token_string = fb.get_token()
args = fb.get_args()
if args.page:
feed_type = 'posts'
profile = args.page[0]
elif args.group:
feed_type = 'feed'
profile = args.group[0]
if args.debug:
print '''
******************
*** DEBUG MODE ***
******************
'''
if args.insights:
print '''
*********************
*** INSIGHTS MODE ***
*********************
'''
dates = {}
if args.since:
dates['since'] = "&since=%s" % args.since[0]
if args.until:
dates['until'] = "&until=%s" % args.until[0]
# MAIN LOOP
graph_url = fb.build_graph_url(profile, feed_type)
print graph_url
# append since and until
for k in dates:
graph_url = graph_url + dates[k]
while graph_url is not None:
posts = fb.get_content(graph_url)
for post in posts['data']:
# create dictionary
post_dict = fb.create_dict(post, 'post')
print 'saving post\t%s' % post_dict['id']
# save post
fb.save_db(post_dict, 'id', 'postdata')
# collect counts for shares, 1st degree comments, reactions
print 'collecting reaction data'
graph_url = fb.build_graph_url(post_dict['id'], 'summaries')
summaries = fb.get_content(graph_url)
summaries_dict = fb.create_dict(summaries, 'summaries')
fb.save_db(summaries_dict, 'id', 'summaries')
# collect insights if applicable
if args.insights:
print 'collecting insights data'
graph_url = fb.build_graph_url(post_dict['id'], 'insights')
insights = fb.get_content(graph_url)
for insight in insights['data']:
insight_dict = fb.create_dict(insight, 'insight', post_dict['id'])
fb.save_db(insight_dict, 'id', insight['name'])
if args.comments:
# collect comments
print 'collecting granular comment data'
graph_url = fb.build_graph_url(post_dict['id'], 'comments')
while graph_url is not None:
comments = fb.get_content(graph_url)
for comment in comments['data']:
# create dictionary
comment_dict = fb.create_dict(comment, 'comment', post_dict['id'])
# save comment
fb.save_db(comment_dict, 'id', 'commentdata')
# collect comment replies
graph_url = fb.build_graph_url(comment_dict['id'], 'comments')
while graph_url is not None:
replies = fb.get_content(graph_url)
for reply in replies['data']:
# create dictionary
reply_dict = fb.create_dict(reply, 'comment', post_dict['id'])
# save comment
fb.save_db(reply_dict, 'id', 'commentdata')
# get next page of replies
graph_url = fb.next_page(replies, 'replies')
# collect comment likes
if args.reactions:
graph_url = fb.build_graph_url(comment_dict['id'], 'likes')
while graph_url is not None:
likes = fb.get_content(graph_url)
for like in likes['data']:
# create dictionary
like_dict = fb.create_dict(like, 'like', comment_dict['id'])
# save like
fb.save_db(like_dict, 'id', 'commentlikedata')
# get next page of likes
graph_url = fb.next_page(likes, 'likes')
# get next page of comments
graph_url = fb.next_page(comments, 'comments')
else:
pass
if args.reactions:
# collect reactions
print 'collecting granular reaction data'
graph_url = fb.build_graph_url(post_dict['id'], 'reactions')
while graph_url is not None:
reactions = fb.get_content(graph_url)
for reaction in reactions['data']:
# create dictionary
reaction_dict = fb.create_dict(reaction, 'reaction', post_dict['id'])
# save like
fb.save_db(reaction_dict, 'id', 'reactiondata')
# get next page of reactions
graph_url = fb.next_page(reactions, 'reactions')
else:
pass
# check for debug
if args.debug:
print '''
*****************
*** DEBUG END ***
*****************
'''
break
# get next page of posts
print 'attempting to collect next page of post data'
graph_url = fb.next_page(posts, 'posts')
print '''
******************
*** SUCCESS! ***
******************
'''
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import time
from time import gmtime, strftime
import scraperwiki
import argparse
import requests
from requests.exceptions import ConnectionError
api_root = 'https://graph.facebook.com/v2.8/'
post_fields = ['id',
'from',
'created_time',
'type',
'message',
'permalink_url',
'shares',
'admin_creator',
'link',
'application']
comment_fields = ['id',
'created_time',
'from',
'message',
'like_count',
'comment_count',
'parent']
insights_fields = ['post_impressions_by_paid_non_paid',
'post_impressions_by_paid_non_paid_unique',
'post_consumptions_by_type',
'post_consumptions_by_type_unique',
'post_impressions_fan_unique',
'post_impressions_viral_unique',
'post_fan_reach',
'post_engaged_fan',
'post_engaged_users'
]
reaction_types = ['ANGRY',
'HAHA',
'LIKE',
'LOVE',
'SAD',
'WOW']
reaction_field_aliases = []
for reaction_type in reaction_types:
reaction_field_aliases.append('reactions.type(%s).limit(0).summary(total_count).as(%s)' %
(reaction_type, reaction_type))
def get_args(argv=None):
parser = argparse.ArgumentParser(description='''
Queries the Facebook Graph API for a given page or group
given at the command line. A long-lived Facebook access
token must be present in the directory (use generate_token.py
to do this). Groups are referenced by their numeric id -
the simplest way to find this is to inspect the HTML code,
search for fb://group/ metadata.''')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-p', '--page',
nargs=1,
type=str,
help='page profile: collects posts made by page.')
group.add_argument('-g', '--group',
nargs=1,
type=str,
help='''group id: collects posts from group
(requires `user_managed_groups` token scope)''')
parser.add_argument('--debug',
action='store_true',
help='only collect 1st page of data.')
parser.add_argument('-i', '--insights',
action='store_true',
help='''collect post insights
(requires `read_insights` token scope)''')
parser.add_argument('-c', '--comments',
action='store_true',
help='collect granular comment & reply data')
parser.add_argument('-r', '--reactions',
action='store_true',
help='collect granular reaction data')
parser.add_argument('--since',
nargs=1,
type=str,
help='only collect posts after (yyyy-mm-dd)')
parser.add_argument('--until',
nargs=1,
type=str,
help='only collect posts before (yyyy-mm-dd)')
return parser.parse_args(argv)
def get_token():
'''collects a pre-generated long lived token'''
token_file = '.fb_long_lived_token'
f = open(token_file, "r")
return f.read()
def build_graph_url(object_id, request_type, *args):
'''build appropriate curl request'''
if (request_type == 'posts' or request_type == 'feed'):
url = api_root + '%s/%s?fields=%s&access_token=%s' % (object_id,
request_type,
",".join(post_fields),
token_string
)
elif request_type == 'comments':
url = api_root + '%s/comments?fields=%s&access_token=%s' % (object_id,
",".join(comment_fields),
token_string
)
elif request_type == 'likes':
url = api_root + '/%s/likes?access_token=%s' % (object_id,
token_string)
elif request_type == 'reactions':
url = api_root + '%s/reactions?access_token=%s' % (object_id,
token_string
)
elif request_type == 'insights':
url = api_root + '%s/%s/%s?access_token=%s' % (object_id,
request_type,
",".join(insights_fields),
token_string
)
elif request_type == 'summaries':
url = api_root + '%s?fields=%s,%s,%s&access_token=%s' % (object_id,
",".join(reaction_field_aliases),
'shares',
'comments.limit(0).summary(total_count)',
token_string)
return url
def get_content(url):
'''retrieve JSON from graph API'''
try:
call = requests.get(url)
except ConnectionError as e:
print e
time.sleep(1)
call = requests.get(url)
content = json.loads(call.text)
return content
def create_dict(object_json, object_type, parent_id=None):
'''return flat dictionary from JSON'''
object_dict = {}
# post object_type
if object_type == 'post':
object_dict = {
'user_name': object_json['from']['name'],
'user_id': object_json['from']['id'],
'id': object_json['id'],
'created_time': object_json['created_time'],
'type': object_json['type'],
}
for key in ('link', 'message', 'object_id'):
if key in object_json:
object_dict[key] = object_json[key]
if 'application' in object_json:
object_dict['application_name'] = object_json['application']['name']
if 'admin_creator' in object_json:
object_dict['admin_creator_name'] = object_json['admin_creator']['name']
if object_type == 'comment':
object_dict = {}
object_dict = {
'post_id': parent_id,
'id': object_json['id'],
'user_name': object_json['from']['name'],
'user_id': object_json['from']['id'],
'created_time': object_json['created_time'],
'message': object_json['message'],
'like_count': object_json['like_count'],
'comment_count': object_json['comment_count']
}
if 'parent' in object_json:
object_dict['parent_id'] = object_json['parent']['id']
if object_type == 'reaction':
object_dict['post_id'] = parent_id
object_dict['user_id'] = object_json['id']
object_dict['user_name'] = object_json['name']
object_dict['type'] = object_json['type']
object_dict['id'] = '%s_%s' % (parent_id, object_json['id'])
if object_type == 'like':
object_dict['comment_id'] = parent_id
object_dict['user_id'] = object_json['id']
object_dict['user_name'] = object_json['name']
object_dict['id'] = '%s_%s' % (parent_id, object_json['id'])
if object_type == 'insight':
if type(object_json['values'][0]['value']) == dict:
object_dict = object_json['values'][0]['value']
else:
object_dict['value'] = object_json['values'][0]['value']
object_dict['postid'] = parent_id
object_dict['date'] = strftime("%Y-%m-%d %H:%M:%S +0000", gmtime())
object_dict['id'] = parent_id + "_" + strftime("%Y%m%d%H%M%S", gmtime())
if object_type == 'summaries':
for summary_type in object_json:
try:
object_dict[summary_type] = object_json[summary_type]['summary']['total_count']
except:
pass # handles 'id' entry
object_dict['postid'] = object_json['id']
object_dict['date'] = strftime("%Y-%m-%d %H:%M:%S +0000", gmtime())
object_dict['comments'] = object_json['comments']['summary']['total_count']
object_dict['id'] = object_dict['postid'] + "_" + strftime("%Y%m%d%H%M%S", gmtime())
try:
object_dict['shares'] = object_json['shares']['count']
except:
pass
return object_dict
def save_db(object_dict, uid, table):
'''save dictionary to sqlite'''
scraperwiki.sqlite.save(unique_keys=[uid],
table_name=table,
data=object_dict)
def next_page(json, content_type):
'''return paging data'''
try:
nextpageURL = json['paging']['next']
return nextpageURL
except KeyError:
pass
def get_count(object_id, content_type):
'''utility function to collect summary data'''
graph_url = api_root + '%s/%s?summary=true&access_token=%s' % (object_id,
content_type,
token_string)
content = get_content(graph_url)
try:
count = content['summary']['total_count']
except:
count = None
return count
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os.path
import os
import stat
import json
import urllib2
import BaseHTTPServer
import webbrowser
import httplib
import mimetools
import mimetypes
import cookielib
import types
from urlparse import urlparse, parse_qs
from urllib import urlencode
from pprint import pprint
APP_ID = '686972531321586'
APP_SECRET = 'a022f30d67004bef1e5151acdffca65e'
SERVER_PORT = 8080
REDIRECT_URI = 'http://127.0.0.1:%s/' % SERVER_PORT
ACCESS_TOKEN = None
LONG_LIVED_TOKEN = None
LOCAL_FILE = '.fb_access_token'
LONG_LIVED_TOKEN_FILE = '.fb_long_lived_token'
AUTH_SCOPE = ['user_managed_groups', 'read_insights']
class _RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
global ACCESS_TOKEN
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
params = parse_qs(urlparse(self.path).query)
ACCESS_TOKEN = params.get('access_token', [None])[0]
if ACCESS_TOKEN:
data = {'scope': AUTH_SCOPE,
'access_token': ACCESS_TOKEN}
open(LOCAL_FILE, 'w').write(json.dumps(data))
self.wfile.write("You have successfully logged in to facebook with fbconsole. "
"You can close this window now.")
else:
self.wfile.write('<html><head>'
'<script>location = "?"+location.hash.slice(1);</script>'
'</head></html>')
def authenticate():
"""Authenticate with facebook so you can make api calls that require auth.
If you want to request certain permissions, set the AUTH_SCOPE global
variable to the list of permissions you want. (https://github.com/fbsamples/fbconsole)
"""
global ACCESS_TOKEN
print "Logging you in to facebook..."
webbrowser.open('https://www.facebook.com/dialog/oauth?' +
urlencode({'client_id': APP_ID,
'redirect_uri': REDIRECT_URI,
'response_type': 'token',
'scope': ','.join(AUTH_SCOPE)}))
httpd = BaseHTTPServer.HTTPServer(('127.0.0.1', SERVER_PORT), _RequestHandler)
while ACCESS_TOKEN is None:
httpd.handle_request()
def exchangetoken():
"""Exchange short lived token (~120mins) for long lived token (60 days)
(https://developers.facebook.com/docs/facebook-login/access-tokens/expiration-and-extension)"""
if os.path.exists(LOCAL_FILE):
data = open(LOCAL_FILE).read()
graph_url = ('https://graph.facebook.com/oauth/access_token?' +
urlencode({'grant_type': 'fb_exchange_token',
'client_id': APP_ID,
'client_secret': APP_SECRET,
'fb_exchange_token': ACCESS_TOKEN
}))
print graph_url
response = urllib2.urlopen(graph_url)
token = json.loads(response.read())['access_token']
print token
open(LONG_LIVED_TOKEN_FILE, 'w').write(token)
authenticate()
exchangetoken()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment