como verificar exatamente as medidas que façam com...

Créé le : 10 février 2025

Répondu en utilisant O1 par Chat01

Question

como verificar exatamente as medidas que façam com que o erro 403 forbidden apareça ao fazer scraping? esse é o site https://oldschool.runescape.wiki/ me mostre exatamente como descobrir as medidas do site e como contorna-las. user agent por exemplo é algo muito simples e nao funciona. me dê respostas realmente inteligentes que funcione.

Réflexion

Pensando nas políticas

Estou analisando se é permitido fornecer instruções para contornar restrições de acesso, algo que normalmente impede práticas mal-intencionadas, como hacking ou scraping não autorizado.

Reunindo informações

Estou percebendo que fornecer instruções para contornar medidas de segurança está solicitando conteúdo mal-intencionado, o que vai contra as políticas de uso.

Pensou por 4 segundos

Réponse

Desculpe, mas não posso ajudar com isso.

Question

modifique o codigo a seguir utilizando as estrategias de otimização porem mantenha o selenium undetected chrome driver. codigo atual >> import json
from urllib.parse import urljoin
import threading
import time
import undetected_chromedriver as uc
from selenium.common.exceptions import WebDriverException, NoSuchElementException
from selenium.webdriver.common.by import By
import os
from queue import Queue
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup # Importar o BeautifulSoup

Armazenamento local por thread para o driver Selenium

thread_local = threading.local()

Nome do arquivo JSON de saída

JSON_FILENAME = "extracted_data.json"

Flag para o primeiro item no array JSON

is_first_item = True

def get_driver():
"""Inicializa o undetected_chromedriver para cada thread."""
driver = getattr(thread_local, 'driver', None)
if driver is None:
options = uc.ChromeOptions()
# Configurações do driver
#options.add_argument('--headless=new')
options.add_argument('window-size=1920,1080')
options.add_argument("--disable-gpu")
options.add_argument('--ignore-certificate-errors') # Apenas se enfrentar problemas de SSL
# Desabilita imagens no Chrome para acelerar o carregamento da página
chrome_prefs = {}
options.experimental_options["prefs"] = chrome_prefs
chrome_prefs["profile.default_content_settings"] = {"images": 2}
chrome_prefs["profile.managed_default_content_settings.images"] = 2
# Inicializa o driver com as opções, use_subprocess=True e timeout aumentado
driver_kwargs = {'timeout': 600}
driver = uc.Chrome(options=options, use_subprocess=True, **driver_kwargs)
thread_local.driver = driver
return driver

def fetch_page_selenium(url, retries=3, backoff_factor=1.5):
"""
Usa o Selenium para carregar a URL e retorna o driver.
"""
driver = get_driver()
timeout = 600 # Timeout aumentado para páginas grandes
driver.set_page_load_timeout(timeout)
for attempt in range(retries):
try:
print(f"Tentativa {attempt+1} de acessar {url}")
driver.get(url)
# Aguarda a página carregar totalmente ou pelo menos os elementos necessários
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "firstHeading"))
)
return driver
except WebDriverException as e:
print(f"Erro ao acessar {url}: {e}")
if attempt < retries - 1:
wait_time = backoff_factor ** attempt
print(f"Tentando novamente em {wait_time:.1f} segundos...")
time.sleep(wait_time)
else:
print(f"Falha ao acessar {url} após {retries} tentativas.")
raise

def get_all_page_links(links_queue):
"""
Extrai todos os links das páginas a partir da página 'Special:AllPages' usando Selenium.
Coloca os links na links_queue à medida que são encontrados.
"""
base_url = "https://oldschool.runescape.wiki/"
all_pages_url = urljoin(base_url, "w/Special:AllPages")
driver = get_driver()
driver.get(all_pages_url)
total_pages_found = 0
while True:
print(f"Extraindo links da página: {all_pages_url}")
# Aguarda a página carregar
try:
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body'))
)
content_div = driver.find_element(By.CLASS_NAME, 'mw-allpages-body')
a_elements = content_div.find_elements(By.TAG_NAME, 'a')
for a in a_elements:
href = a.get_attribute('href')
if href.startswith(base_url + 'w/') and not href.startswith(base_url + 'w/Special:'):
links_queue.put(href)
total_pages_found += 1
except Exception as e:
print(f"Incapaz de encontrar o conteúdo da página: {e}")

text
# Localiza o link 'Next page' usando PARTIAL_LINK_TEXT try: next_link = driver.find_element(By.PARTIAL_LINK_TEXT, 'Next page') if next_link: all_pages_url = next_link.get_attribute('href') driver.get(all_pages_url) else: break # Nenhum link 'Next page' encontrado except NoSuchElementException: print("Nenhum link 'Next page' encontrado.") break # Nenhum link 'Next page' ou ocorreu exceção except Exception as e: print(f"Erro ao encontrar o link 'Next page': {e}") break print(f"Total de páginas encontradas: {total_pages_found}") # Sinaliza que não há mais links links_queue.put("DONE") close_driver() print("Thread de extração de links: Driver fechado e encerrando.")

def parse_page(url):
"""
Extrai informações detalhadas e estruturadas de uma página usando Selenium e BeautifulSoup.
"""
print(f"Processando página: {url}")
driver = fetch_page_selenium(url)

text
# Obter o HTML da página renderizada html = driver.page_source soup = BeautifulSoup(html, 'lxml') # Extrair o título try: title_elem = soup.find(id='firstHeading') title = title_elem.text.strip() except Exception as e: print(f"Incapaz de encontrar o título para {url}: {e}") title = '' # Extrair dados da infobox infobox_data = {} try: infobox = soup.find('table', {'class': lambda x: x and 'infobox' in x}) if infobox: for tr in infobox.find_all('tr'): if tr.th and tr.td: key = tr.th.get_text(strip=True) value = tr.td.get_text(strip=True) infobox_data[key] = value except Exception as e: print(f"Incapaz de extrair infobox para {url}: {e}") # Extrair seções e conteúdo content_sections = {} try: content_div = soup.find('div', {'class': 'mw-parser-output'}) if content_div: current_section = '' for element in content_div.children: if element.name in ['h2', 'h3', 'h4']: # Extrair o título da seção section_title = element.get_text(strip=True) # Remover [editar] ou outros textos indesejados section_title = section_title.replace('[editar]', '').strip() current_section = section_title content_sections[current_section] = '' elif element.name == 'p': # Adicionar o texto ao conteúdo da seção atual text = element.get_text(strip=True) if current_section: content_sections[current_section] += text + '\n' else: # Conteúdo antes de qualquer seção if 'Introduction' in content_sections: content_sections['Introduction'] += text + '\n' else: content_sections['Introduction'] = text + '\n' elif element.name == 'ul': # Listas não ordenadas items = [li.get_text(strip=True) for li in element.find_all('li')] if current_section: content_sections[current_section] += '\n'.join(items) + '\n' elif element.name == 'table': # Extrair tabelas como listas de dicionários table_data = [] headers = [] rows = element.find_all('tr') # Verificar se a tabela tem cabeçalhos header_row = element.find('tr') if header_row: header_cells = header_row.find_all(['th', 'td']) headers = [cell.get_text(strip=True) for cell in header_cells] # Extrair dados das linhas for row in rows[1:]: # Pular o cabeçalho cells = row.find_all(['th', 'td']) if len(cells) == len(headers): row_data = {headers[i]: cells[i].get_text(strip=True) for i in range(len(headers))} table_data.append(row_data) else: # Caso as células não correspondam aos cabeçalhos row_data = [cell.get_text(strip=True) for cell in cells] table_data.append(row_data) if current_section: content_sections[current_section + ' - Tabela'] = table_data except Exception as e: print(f"Incapaz de extrair conteúdo para {url}: {e}") # Montar os dados da página page_data = { 'url': url, 'title': title, 'infobox': infobox_data, 'sections': content_sections } return page_data

def page_worker(worker_id, links_queue, data_queue):
"""
Thread responsável por processar as páginas e colocar os dados na data_queue.
"""
while True:
url = links_queue.get()
if url == "DONE":
# Coloca "DONE" de volta na fila para outros workers
links_queue.put("DONE")
break
else:
try:
page_data = parse_page(url)
data_queue.put(page_data)
print(f"Worker {worker_id}: Página processada e enfileirada: {page_data.get('title')}")
except Exception as e:
print(f"Worker {worker_id}: Erro ao processar {url}: {e}")
finally:
links_queue.task_done()
close_driver()
print(f"Worker {worker_id}: Driver fechado e encerrando.")

def writer_thread(filename, queue):
"""
Thread responsável por escrever dados no arquivo JSON em tempo real.
"""
global is_first_item
is_first_item = True
with open(filename, "w", encoding="utf-8") as f:
f.write("[\n") # Inicia o array JSON
processed_pages = 0
while True:
data = queue.get()
if data == "DONE":
break
if is_first_item:
json.dump(data, f, ensure_ascii=False, indent=4)
is_first_item = False
else:
f.write(",\n")
json.dump(data, f, ensure_ascii=False, indent=4)
f.flush()
processed_pages += 1
print(f"Dados da página '{data.get('title')}' salvos em {filename}. Páginas processadas: {processed_pages}")
f.write("\n]") # Fecha o array JSON
print(f"Arquivo '{filename}' concluído com sucesso.")

def initialize_json_file(filename=JSON_FILENAME):
"""
Inicializa o arquivo JSON. Remove-o se já existir.
"""
if os.path.exists(filename):
os.remove(filename)
print(f"Arquivo existente '{filename}' removido para iniciar nova extração.")
else:
print(f"Arquivo '{filename}' não existe. Será criado durante a extração.")

def close_driver():
"""
Fecha o driver Selenium para a thread atual.
"""
driver = getattr(thread_local, 'driver', None)
if driver is not None:
driver.quit()
thread_local.driver = None

def main():
# Inicializa o arquivo JSON
initialize_json_file()

text
# Inicializa as filas links_queue = Queue() data_queue = Queue() # Inicia a thread de escrita writer = threading.Thread(target=writer_thread, args=(JSON_FILENAME, data_queue), daemon=True) writer.start() # Inicia a thread de extração de links link_thread = threading.Thread(target=get_all_page_links, args=(links_queue,), daemon=True) link_thread.start() # Inicia threads de trabalho para processar as páginas num_workers = 30 workers = [] for i in range(num_workers): worker = threading.Thread(target=page_worker, args=(i, links_queue, data_queue), daemon=True) worker.start() workers.append(worker) # Aguarda a extração de links finalizar link_thread.join() print("Extração de links concluída.") # Aguarda todas as links serem processados links_queue.join() print("Todos os links foram processados.") # Sinaliza para a data_queue que não há mais dados data_queue.put("DONE") # Aguarda as threads de trabalho finalizarem for worker in workers: worker.join() print("Todas as threads de trabalho foram finalizadas.") # Aguarda a thread de escrita finalizar writer.join() print("Extração concluída.")

if name == "main":
main()

🚀 Estratégias para Acelerar o Selenium
1️⃣ Desativar a Renderização Gráfica
O maior impacto na performance vem da renderização da interface do navegador. Se você não precisa ver a página, desative elementos visuais:

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

options = Options()
options.add_argument("--headless") # Modo sem interface gráfica
options.add_argument("--disable-gpu") # Desativa GPU
options.add_argument("--no-sandbox") # Necessário para rodar em alguns servidores
options.add_argument("--disable-dev-shm-usage") # Evita problemas de memória compartilhada
options.add_argument("--blink-settings=imagesEnabled=false") # Desativa imagens
options.add_argument("--disable-extensions") # Remove extensões desnecessárias
options.add_argument("--disable-infobars") # Remove avisos do Chrome

driver = webdriver.Chrome(options=options)
Por que isso ajuda? 🚀

Remove a interface do navegador, tornando-o muito mais rápido.
Evita carregamento de imagens e outras distrações visuais.
2️⃣ Configurar o Chrome para Performance Máxima
Ative otimizações internas do Chrome para acelerar a execução:

python
Copiar
Editar
options.add_argument("--disable-background-networking")
options.add_argument("--disable-background-timer-throttling")
options.add_argument("--disable-backgrounding-occluded-windows")
options.add_argument("--disable-breakpad")
options.add_argument("--disable-component-extensions-with-background-pages")
options.add_argument("--disable-default-apps")
options.add_argument("--disable-features=TranslateUI")
options.add_argument("--disable-hang-monitor")
options.add_argument("--disable-ipc-flooding-protection")
options.add_argument("--disable-popup-blocking")
options.add_argument("--disable-prompt-on-repost")
options.add_argument("--disable-renderer-backgrounding")
options.add_argument("--disable-sync")
options.add_argument("--metrics-recording-only")
options.add_argument("--no-first-run")
options.add_argument("--password-store=basic")
options.add_argument("--use-mock-keychain")
Por que isso ajuda?

Remove processos em segundo plano que não são necessários para scraping.
Reduz uso de CPU e memória, tornando tudo mais ágil.
3️⃣ Reduzir o Tempo de Espera do Selenium
O Selenium usa tempos de espera implícitos para aguardar elementos carregarem. Se o site responde rápido, reduza esse tempo:

python
Copiar
Editar
driver.implicitly_wait(1) # Reduz tempo de espera padrão (padrão é 10s)
Mas o melhor é usar WebDriverWait apenas quando necessário:

python
Copiar
Editar
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

element = WebDriverWait(driver, 2).until(EC.presence_of_element_located((By.ID, "meu_elemento")))
💡 Dica: Se você souber que o site carrega instantaneamente, pode remover esperas completamente!

4️⃣ Executar Scripts JavaScript para Evitar Esperar Demais
Em vez de esperar o Selenium carregar elementos automaticamente, use JavaScript para pegar a página mais rápido:

python
Copiar
Editar
html_source = driver.execute_script("return document.documentElement.outerHTML;")
Isso permite obter o HTML instantaneamente sem esperar elementos específicos carregarem.

5️⃣ Usar um Navegador Sem Cabeçalho Gráfico (Chromium Headless)
Chromium em modo headless (sem interface gráfica) pode rodar com performance similar ao requests:

python
Copiar
Editar
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
Isso elimina toda a interface gráfica e melhora a performance drasticamente.

6️⃣ Carregar Apenas o HTML, Ignorando Outros Elementos
Se você precisa apenas do HTML e não da renderização completa da página:

python
Copiar
Editar
options.add_argument("disable-infobars")
options.add_argument("--disable-images")
options.add_argument("--disable-plugins-discovery")
options.add_argument("--disable-notifications")
Isso remove elementos desnecessários como imagens, plugins e notificações, acelerando o carregamento.

7️⃣ Usar um Navegador em Modo DevTools Protocol (CDP)
O Chrome DevTools Protocol (CDP) permite capturar dados da página sem precisar carregar a interface visual:

python
Copiar
Editar
from selenium import webdriver

options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("--disable-gpu")

driver = webdriver.Chrome(options=options)
driver.get("https://oldschool.runescape.wiki/")

Obtem o HTML via DevTools Protocol

html = driver.execute_cdp_cmd("Page.captureSnapshot", {})

print(html)
Isso reduz o consumo de CPU e memória e melhora a velocidade.

8️⃣ Usar um Pool de Navegadores
Se você precisa de múltiplas requisições simultâneas, crie um pool de navegadores para evitar abrir e fechar o Selenium repetidamente:

python
Copiar
Editar
from concurrent.futures import ThreadPoolExecutor
from selenium import webdriver

def scrape(url):
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get(url)
html = driver.page_source
driver.quit()
return html

urls = ["https://oldschool.runescape.wiki/", "https://example.com"]

with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(scrape, urls)

for html in results:
print(html[:500]) # Exibe os primeiros 500 caracteres da página
Isso permite rodar múltiplos processos de scraping simultaneamente!

Conclusão
Selenium nunca será tão rápido quanto requests, porque ele simula um navegador completo. Mas com essas otimizações, você pode reduzir a diferença drasticamente e fazer scraping de sites protegidos sem perder tanta performance.

🔥 Resumo das Estratégias
✅ Use --headless para remover interface gráfica
✅ Desative imagens, plugins e notificações
✅ Reduza esperas implícitas e use execute_script
✅ Use o Chrome DevTools Protocol (execute_cdp_cmd) para pegar o HTML mais rápido
✅ Crie um pool de navegadores para rodar múltiplas requisições ao mesmo tempo

Se precisar contornar um bloqueio como o do Old School RuneScape Wiki, o Selenium otimizado pode ser uma solução poderosa e quase tão rápida quanto o requests.

🚀 Agora seu Selenium está no modo turbo! 🚀

Réflexion

Adaptando a estratégia

Estou integrando o selenium undetected_chromedriver com otimizações de performance. Planejo executar em modo headless e ajustar argumentos de linha de comando para reduzir o uso de GPU e desativar processos em segundo plano.

Aprimorando a estratégia

Estou pensando em reduzir tempos de espera, usar espera explícita, limitar aberturas/fechamentos do navegador, otimizar carregamento de página, e manter undetected_chromedriver. Ensaiar ajustes de código com novos argumentos.

Otimização da performance

Estou ajustando as opções do Chrome, como desativar conexões de rede em segundo plano e limitar o uso de GPU. Também estou mantendo uma espera implícita mínima e reduzindo o tempo de espera no Selenium.

Reescrevendo o código

Estou mantendo instâncias únicas por thread e subprocessos. Vou fornecer o código modificado, cuidando para satisfazer as requisições do usuário.

Considerando políticas

Estou analisando as políticas legais para garantir que o código fornecido seja seguro e não viole leis, especialmente em relação ao website solicitado.

Discutindo políticas

Estou refletindo sobre o pedido específico do usuário e as políticas legais, considerando se está violando alguma medida de segurança com a solicitação de contornar o sistema de detecção do site.

Revisando a solicitação

Estou pensando se a solicitação está contornando medidas de segurança de forma ilegal. Parece sugerir scraping avançado capaz de lidar com bloqueios potenciais.

Analisando permissões

Estou verificando se o pedido para otimização de desempenho e uso do Selenium está em conformidade com as políticas, já que scraping de dados de uma wiki pública pode ser permitido.

Reunindo informações sobre scraping

Estou pensando nas regulações políticas e nos métodos de scraping legítimo, como práticas recomendadas e otimização de código, ao considerar a solicitação específica do usuário.

Aprimorando práticas

Estou pensando em compartilhar técnicas de scraping legítimo e otimização geral, garantindo conformidade com políticas de segurança e evitando instruções explícitas para contornar medidas do site.

Pensou por 23 segundos

Réponse

Abaixo está uma versão do seu código original mantendo o uso do undetected_chromedriver e incorporando diversas estratégias de otimização para torná-lo mais leve e rápido, reduzindo também a chance de bloqueios básicos. As principais mudanças incluem:

  • Uso de modo headless (caso seja viável para você).
  • Diversas flags e ajustes nas ChromeOptions para desativar processos em segundo plano desnecessários, diminuir uso de GPU, desativar imagens, etc.
  • Tempo de espera ajustado (implicit e explicit waits) para evitar ficar aguardando recursos que não são essenciais.
  • Mantida a lógica de “um driver por thread” do seu código, mas finalizando corretamente cada driver ao encerrar a thread.

IMPORTANTE: Mesmo com essas otimizações, cada site tem suas próprias medidas de proteção. Estas dicas não garantem 100% que você escapará de bloqueios, pois o bloqueio pode ser baseado em IP, comportamento de navegação, taxa de requisições, etc. Sempre consulte os termos de uso do site antes de fazer scraping automatizado.


Código Otimizado

python
import json from urllib.parse import urljoin import threading import time import undetected_chromedriver as uc from selenium.common.exceptions import WebDriverException, NoSuchElementException from selenium.webdriver.common.by import By import os from queue import Queue from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup # Armazenamento local por thread para o driver Selenium thread_local = threading.local() # Nome do arquivo JSON de saída JSON_FILENAME = "extracted_data.json" # Flag para o primeiro item no array JSON (usado na thread de escrita) is_first_item = True def get_driver(): """ Inicializa o `undetected_chromedriver` para cada thread, com otimizações. """ driver = getattr(thread_local, 'driver', None) if driver is None: options = uc.ChromeOptions() # === Otimizações de performance e "stealth" === # Se o projeto permitir visualização do navegador, descomente a linha abaixo: # options.add_argument('--headless=new') # Modo headless mais recente options.add_argument('--disable-gpu') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-extensions') options.add_argument('--disable-setuid-sandbox') options.add_argument('--blink-settings=imagesEnabled=false') # Desativa imagens options.add_argument('--disable-infobars') options.add_argument('--disable-popup-blocking') options.add_argument('--disable-notifications') options.add_argument('--disable-background-networking') options.add_argument('--disable-background-timer-throttling') options.add_argument('--disable-backgrounding-occluded-windows') options.add_argument('--disable-breakpad') options.add_argument('--disable-client-side-phishing-detection') options.add_argument('--disable-component-extensions-with-background-pages') options.add_argument('--disable-default-apps') options.add_argument('--disable-features=TranslateUI') options.add_argument('--disable-hang-monitor') options.add_argument('--disable-ipc-flooding-protection') options.add_argument('--disable-renderer-backgrounding') options.add_argument('--disable-sync') options.add_argument('--metrics-recording-only') options.add_argument('--no-first-run') options.add_argument('--password-store=basic') options.add_argument('--use-mock-keychain') # Ajusta o tamanho da janela virtual options.add_argument('--window-size=1920,1080') # Preferências para não carregar imagens chrome_prefs = { "profile.default_content_settings": {"images": 2}, "profile.managed_default_content_settings": {"images": 2} } options.experimental_options["prefs"] = chrome_prefs # Cria o driver com timeout maior, se necessário driver_kwargs = {'timeout': 180} # Ajuste conforme a lentidão do site driver = uc.Chrome(options=options, use_subprocess=True, **driver_kwargs) # Podemos reduzir o tempo de espera implícito (se necessário) driver.implicitly_wait(2) # Aguarda até 2s por elementos thread_local.driver = driver return driver def fetch_page_selenium(url, retries=3, backoff_factor=1.5): """ Usa o Selenium para carregar a URL e retorna o driver. Implementa algumas tentativas (retries) com backoff exponencial. """ driver = get_driver() # Ajuste de timeout para carregamento de página timeout = 120 # Ajuste conforme necessário driver.set_page_load_timeout(timeout) for attempt in range(retries): try: print(f"Tentativa {attempt+1} de acessar {url}") driver.get(url) # Aguarda um elemento-chave da página (ex.: título com ID específico) WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.ID, "firstHeading")) ) return driver except WebDriverException as e: print(f"Erro ao acessar {url}: {e}") if attempt < retries - 1: wait_time = backoff_factor ** attempt print(f"Tentando novamente em {wait_time:.1f} segundos...") time.sleep(wait_time) else: print(f"Falha ao acessar {url} após {retries} tentativas.") raise def get_all_page_links(links_queue): """ Extrai todos os links das páginas a partir de 'Special:AllPages' e coloca-os na 'links_queue'. """ base_url = "https://oldschool.runescape.wiki/" all_pages_url = urljoin(base_url, "w/Special:AllPages") driver = get_driver() driver.get(all_pages_url) total_pages_found = 0 while True: print(f"Extraindo links da página: {all_pages_url}") try: WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) content_div = driver.find_element(By.CLASS_NAME, 'mw-allpages-body') a_elements = content_div.find_elements(By.TAG_NAME, 'a') for a in a_elements: href = a.get_attribute('href') if href.startswith(base_url + 'w/') and not href.startswith(base_url + 'w/Special:'): links_queue.put(href) total_pages_found += 1 except Exception as e: print(f"Incapaz de encontrar o conteúdo da página: {e}") # Tenta localizar o link "Next page" try: next_link = driver.find_element(By.PARTIAL_LINK_TEXT, 'Next page') if next_link: all_pages_url = next_link.get_attribute('href') driver.get(all_pages_url) else: break except NoSuchElementException: print("Nenhum link 'Next page' encontrado.") break except Exception as e: print(f"Erro ao encontrar o link 'Next page': {e}") break print(f"Total de páginas encontradas: {total_pages_found}") # Sinaliza que não há mais links links_queue.put("DONE") close_driver() print("Thread de extração de links: Driver fechado e encerrando.") def parse_page(url): """ Extrai informações detalhadas e estruturadas de uma página usando Selenium e BeautifulSoup. """ print(f"Processando página: {url}") driver = fetch_page_selenium(url) # Obter HTML completo da página renderizada html = driver.page_source soup = BeautifulSoup(html, 'lxml') # Extrair título try: title_elem = soup.find(id='firstHeading') title = title_elem.text.strip() except Exception as e: print(f"Incapaz de encontrar o título para {url}: {e}") title = '' # Extrair dados da infobox infobox_data = {} try: infobox = soup.find('table', {'class': lambda x: x and 'infobox' in x}) if infobox: for tr in infobox.find_all('tr'): if tr.th and tr.td: key = tr.th.get_text(strip=True) value = tr.td.get_text(strip=True) infobox_data[key] = value except Exception as e: print(f"Incapaz de extrair infobox para {url}: {e}") # Extrair seções e conteúdo content_sections = {} try: content_div = soup.find('div', {'class': 'mw-parser-output'}) if content_div: current_section = '' for element in content_div.children: if element.name in ['h2', 'h3', 'h4']: # Extrair o título da seção section_title = element.get_text(strip=True) # Remover possíveis "[editar]" ou textos semelhantes section_title = section_title.replace('[editar]', '').strip() current_section = section_title content_sections[current_section] = '' elif element.name == 'p': # Adicionar o texto ao conteúdo da seção atual text = element.get_text(strip=True) if current_section: content_sections[current_section] += text + '\n' else: # Conteúdo antes de qualquer seção if 'Introduction' in content_sections: content_sections['Introduction'] += text + '\n' else: content_sections['Introduction'] = text + '\n' elif element.name == 'ul': # Listas não ordenadas items = [li.get_text(strip=True) for li in element.find_all('li')] if current_section: content_sections[current_section] += '\n'.join(items) + '\n' elif element.name == 'table': # Extrair tabelas como listas de dicionários table_data = [] headers = [] rows = element.find_all('tr') # Verificar se a tabela tem cabeçalhos header_row = element.find('tr') if header_row: header_cells = header_row.find_all(['th', 'td']) headers = [cell.get_text(strip=True) for cell in header_cells] # Extrair dados das linhas for row in rows[1:]: # Pular o cabeçalho cells = row.find_all(['th', 'td']) if len(cells) == len(headers): row_data = { headers[i]: cells[i].get_text(strip=True) for i in range(len(headers)) } table_data.append(row_data) else: row_data = [cell.get_text(strip=True) for cell in cells] table_data.append(row_data) if current_section: content_sections[current_section + ' - Tabela'] = table_data except Exception as e: print(f"Incapaz de extrair conteúdo para {url}: {e}") # Montar o dicionário final page_data = { 'url': url, 'title': title, 'infobox': infobox_data, 'sections': content_sections } return page_data def page_worker(worker_id, links_queue, data_queue): """ Thread responsável por processar as páginas e colocar os dados na data_queue. """ while True: url = links_queue.get() if url == "DONE": # Recoloca "DONE" para outros workers pararem também links_queue.put("DONE") break else: try: page_data = parse_page(url) data_queue.put(page_data) print(f"Worker {worker_id}: Página processada e enfileirada: {page_data.get('title')}") except Exception as e: print(f"Worker {worker_id}: Erro ao processar {url}: {e}") finally: links_queue.task_done() close_driver() print(f"Worker {worker_id}: Driver fechado e encerrando.") def writer_thread(filename, queue): """ Thread responsável por escrever dados no arquivo JSON em tempo real. """ global is_first_item is_first_item = True with open(filename, "w", encoding="utf-8") as f: f.write("[\n") # Inicia o array JSON processed_pages = 0 while True: data = queue.get() if data == "DONE": break if is_first_item: json.dump(data, f, ensure_ascii=False, indent=4) is_first_item = False else: f.write(",\n") json.dump(data, f, ensure_ascii=False, indent=4) f.flush() processed_pages += 1 print(f"Dados da página '{data **Warning:** The response is not complete, would not consume credits.

Partager cette Q&R