BP2024/harvester/Facebook/facebook_crawler.py

134 lines
5.5 KiB
Python
Raw Normal View History

2024-04-09 13:39:11 +00:00
import os
import sys
import time
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import selenium.common.exceptions
from Facebook.facebook_parser import FacebookParser
from crawler import Crawler
class FacebookCrawler(Crawler, FacebookParser):
def __init__(self, base_url: str, file_name: str):
super().__init__(base_url, file_name)
try:
with open(os.path.join('locators.json')) as file:
self.locators = json.load(file)
with open(os.path.join('Facebook', 'inputs', self.filename)) as file:
self.URLS = tuple(file.readlines())
except FileNotFoundError:
print(os.path.join('Facebook', 'inputs', self.filename))
print("Invalid input value")
sys.exit(1)
# crawling part of the code
def crawl(self):
counter = len(self.URLS)
for idx,url in enumerate(self.URLS):
# redirect and wait for page to load
self.driver.get(url)
self.driver.implicitly_wait(4)
if 'videos' in url:
try:
self.driver.find_element(By.XPATH, "//*[text()='Most relevant' or text()='Newest' or text()='Oldest']")
except selenium.common.exceptions.NoSuchElementException:
self.driver.find_element(By.XPATH, "//span[contains(@class, 'x6ikm8r x10wlt62 xlyipyv')]").click()
if self.driver.find_element(By.XPATH, "//div[contains(@class, 'x78zum5 xdt5ytf x1iyjqo2 x7ywyr2')]") is not None:
print('Cant crawl comments section')
continue
self.close_censorship('Newest')
else:
try:
self.driver.find_element(By.XPATH, "//*[text()='Most relevant' or text()='Newest' or text()='Oldest']")
except:
pass
self.close_censorship('All comments')
self.driver.implicitly_wait(3)
print('continue scraping')
# clicking features
self.view_more_comments()
self.show_replies()
self.click_see_more()
# # parsing part of the code
# Dictionary of classes, if facebook changes any class, rewrite this DICT
if '/videos/' in url:
self.class_dict = self.locators['facebook_video_locators']
elif '/posts/' in url:
self.class_dict = self.locators['facebook_post_locators']
self.parse(self.driver.page_source, self.class_dict, self.filename)
print(f'Done: [{idx + 1}/{counter}]')
def view_more_comments(self):
elements = self.driver.find_elements(By.XPATH, "//span[contains(text(), 'comments') and contains(text(), 'View') and contains(text(), 'more')]")
while elements:
try:
self.driver.execute_script("arguments[0].click();", elements[0])
elements = self.driver.find_elements(By.XPATH, "//span[contains(text(), 'comments') and contains(text(), 'View') and contains(text(), 'more')]")
self.driver.execute_script("var scrollingElement = (document.scrollingElement || document.body);scrollingElement.scrollTop = scrollingElement.scrollHeight;")
except selenium.common.exceptions.StaleElementReferenceException:
elements = self.driver.find_elements(By.XPATH, "//span[contains(text(), 'comments') and contains(text(), 'View') and contains(text(), 'more')]")
# function, for showing hidden replies
def show_replies(self):
repl_elements = self.driver.find_elements(By.XPATH, "//span[contains(text(), 'repl') and not(contains(text(), 'Hide')) and not(contains(text(), 'Newest'))]")
i = 1
while repl_elements:
try:
for element in repl_elements:
self.driver.execute_script("arguments[0].click();", element)
time.sleep(0.5)
except selenium.common.exceptions.StaleElementReferenceException:
pass
repl_elements = self.driver.find_elements(By.XPATH, "//span[contains(text(), 'repl') and not(contains(text(), 'Hide')) and not(contains(text(), 'Newest'))]")
# method for expanding comments
def click_see_more(self):
elements = self.driver.find_elements(By.XPATH, "//*[text()='See more']")
for element in elements:
self.driver.execute_script("arguments[0].click();", element)
# method for clossing most relevant filter to Newest
def close_censorship(self, classification: str):
self.driver.implicitly_wait(3)
try:
dropdown = self.driver.find_element(By.XPATH, "//*[text()='Most relevant' or text()='Newest']")
self.driver.execute_script("arguments[0].click();", dropdown) # clicking on it
newest_comments = self.driver.find_element(By.XPATH, f"//*[text()='{classification}']")
self.driver.execute_script("arguments[0].click();", newest_comments) # clicking on it
except:
self.close_censorship(classification)
def close(self):
print('Scraping ended succesffuly')
self.driver.quit()
sys.exit(0)