C:\pythonapigenaiprojeto>python gemini_crawler.py ...
Criado em: 10 de fevereiro de 2025
Criado em: 10 de fevereiro de 2025
C:\pythonapigenaiprojeto>python gemini_crawler.py [Erro] Falha ao ler extracted_data.json: Expecting ',' delimiter: line 22427 column 2 (char 970901). Reiniciando em branco. [Writer] Iniciando escrita em extracted_data.json [Worker 2] Iniciando. [Worker 0] Iniciando. [Worker 4] Iniciando. [Worker 1] Iniciando. [Worker 3] Iniciando. [Process-2] Iniciando extração de links. esse é o codigo completo do projeto >>> import json
from urllib.parse import urljoin
import time
import undetected_chromedriver as uc
from selenium.common.exceptions import WebDriverException, NoSuchElementException
from selenium.webdriver.common.by import By
import os
from bs4 import BeautifulSoup
from multiprocessing import Process, Queue, current_process
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
JSON_FILENAME = "extracted_data.json"
driver = None
########################################################################
########################################################################
def load_existing_data(filename):
"""
Lê o arquivo JSON (se existir) e retorna (existing_data, processed_urls).
existing_data: lista dos itens já presentes no JSON.
processed_urls: set com todas as URLs já processadas.
Se não existir ou estiver vazio/ilegível, retorna listas/set vazios.
"""
if not os.path.exists(filename):
return [], set()
texttry: with open(filename, "r", encoding="utf-8") as f: existing_data = json.load(f) # Espera-se um array de objetos if not isinstance(existing_data, list): print(f"[Aviso] Arquivo {filename} existe, mas não é uma lista. Reiniciando em branco.") return [], set() except Exception as e: print(f"[Erro] Falha ao ler {filename}: {e}. Reiniciando em branco.") return [], set() # Extrai URLs já processadas processed_urls = set() for item in existing_data: url = item.get("url") if url: processed_urls.add(url) print(f"[Info] {len(existing_data)} itens existentes carregados de '{filename}'.") return existing_data, processed_urls
def get_driver():
"""
Inicializa o undetected_chromedriver
para cada processo se ainda não houver um driver,
com configurações otimizadas para carregamento mais rápido.
"""
global driver
if driver is None:
options = uc.ChromeOptions()
text# Estratégia de carregamento 'eager' (não espera todos os recursos como imagens) options.page_load_strategy = 'eager' # Configurações gerais do driver options.add_argument('--headless=new') options.add_argument('window-size=1920,1080') options.add_argument("--disable-gpu") options.add_argument('--disable-extensions') # Desativa extensões options.add_argument('--ignore-certificate-errors') # Se enfrentar problemas de SSL # Desabilita imagens no Chrome para acelerar o carregamento chrome_prefs = {} options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} chrome_prefs["profile.managed_default_content_settings.images"] = 2 # Inicializa o driver com as opções, use_subprocess=True e timeout aumentado driver_kwargs = {'timeout': 600} driver = uc.Chrome(options=options, use_subprocess=True, **driver_kwargs) return driver
def close_driver():
"""
Fecha o driver Selenium para o processo atual.
"""
global driver
if driver is not None:
try:
driver.quit()
except:
pass
driver = None
def fetch_page_selenium(url, retries=3, backoff_factor=1.2):
"""
Usa o Selenium para carregar a URL e retorna o driver.
- pageLoadStrategy='eager' permite que retorne sem esperar todo carregamento.
- Verificamos a presença de 'mw-parser-output' (conteúdo principal) para garantir
que já podemos extrair algo.
"""
drv = get_driver()
texttimeout = 60 drv.set_page_load_timeout(timeout) for attempt in range(retries): try: print(f"[{current_process().name}] Tentativa {attempt+1} de acessar {url}") drv.get(url) # Aguarda apenas a presença do contêiner principal (mw-parser-output) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, "mw-parser-output")) ) return drv except WebDriverException as e: print(f"[{current_process().name}] Erro ao acessar {url}: {e}") if attempt < retries - 1: wait_time = (backoff_factor ** attempt) print(f"[{current_process().name}] Tentando novamente em {wait_time:.1f} segundos...") time.sleep(wait_time) else: print(f"[{current_process().name}] Falha ao acessar {url} após {retries} tentativas.") raise
########################################################################
########################################################################
def get_all_page_links(links_queue, processed_urls):
"""
Extrai todos os links das páginas a partir da página 'Special:AllPages' usando Selenium.
Coloca os links na links_queue
à medida que são encontrados.
Ao final, insere "DONE" na fila para indicar fim.
Pula URLs já contidas em processed_urls.
"""
print(f"[{current_process().name}] Iniciando extração de links.")
textbase_url = "https://oldschool.runescape.wiki/" all_pages_url = urljoin(base_url, "w/Special:AllPages") drv = get_driver() try: drv.set_page_load_timeout(30) drv.get(all_pages_url) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) except Exception as e: print(f"[{current_process().name}] Erro inicial ao abrir Special:AllPages: {e}") total_pages_found = 0 while True: print(f"[{current_process().name}] Extraindo links da página: {all_pages_url}") try: WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) content_div = drv.find_element(By.CLASS_NAME, 'mw-allpages-body') a_elements = content_div.find_elements(By.TAG_NAME, 'a') for a in a_elements: href = a.get_attribute('href') if href.startswith(base_url + 'w/') and not href.startswith(base_url + 'w/Special:'): # Verifica se já processada if href not in processed_urls: links_queue.put(href) total_pages_found += 1 except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o conteúdo da página: {e}") # Localiza o link 'Next page' try: next_link = drv.find_element(By.PARTIAL_LINK_TEXT, 'Next page') if next_link: all_pages_url = next_link.get_attribute('href') drv.get(all_pages_url) else: break # Nenhum link 'Next page' encontrado except NoSuchElementException: print(f"[{current_process().name}] Nenhum link 'Next page' encontrado.") break except Exception as e: print(f"[{current_process().name}] Erro ao encontrar o link 'Next page': {e}") break print(f"[{current_process().name}] Total de novas páginas encontradas: {total_pages_found}") # Sinaliza que não há mais links links_queue.put("DONE") close_driver() print(f"[{current_process().name}] Extração de links concluída e driver fechado.")
def parse_page(url):
"""
Extrai informações detalhadas e estruturadas de uma página usando Selenium e BeautifulSoup.
Usa tempo de espera menor, apoiado em 'eager'.
"""
print(f"[{current_process().name}] Processando página: {url}")
drv = fetch_page_selenium(url)
texthtml = drv.page_source soup = BeautifulSoup(html, 'html.parser') # Extrair o título try: title_elem = soup.find(id='firstHeading') title = title_elem.text.strip() if title_elem else '' except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o título para {url}: {e}") title = '' # Extrair dados da infobox infobox_data = {} try: infobox = soup.find('table', {'class': lambda x: x and 'infobox' in x}) if infobox: for tr in infobox.find_all('tr'): if tr.th and tr.td: key = tr.th.get_text(strip=True) value = tr.td.get_text(strip=True) infobox_data[key] = value except Exception as e: print(f"[{current_process().name}] Incapaz de extrair infobox para {url}: {e}") # Extrair seções e conteúdo content_sections = {} try: content_div = soup.find('div', {'class': 'mw-parser-output'}) if content_div: current_section = '' for element in content_div.children: if element.name in ['h2', 'h3', 'h4']: section_title = element.get_text(strip=True).replace('[editar]', '').strip() current_section = section_title content_sections[current_section] = '' elif element.name == 'p': text = element.get_text(strip=True) if current_section: content_sections[current_section] += text + '\n' else: # Conteúdo antes de qualquer seção if 'Introduction' in content_sections: content_sections['Introduction'] += text + '\n' else: content_sections['Introduction'] = text + '\n' elif element.name == 'ul': items = [li.get_text(strip=True) for li in element.find_all('li')] if current_section: content_sections[current_section] += '\n'.join(items) + '\n' elif element.name == 'table': table_data = [] headers = [] rows = element.find_all('tr') header_row = element.find('tr') if header_row: header_cells = header_row.find_all(['th', 'td']) headers = [cell.get_text(strip=True) for cell in header_cells] for row in rows[1:]: cells = row.find_all(['th', 'td']) if len(cells) == len(headers): row_data = { headers[i]: cells[i].get_text(strip=True) for i in range(len(headers)) } table_data.append(row_data) else: row_data = [cell.get_text(strip=True) for cell in cells] table_data.append(row_data) if current_section: content_sections[current_section + ' - Tabela'] = table_data except Exception as e: print(f"[{current_process().name}] Incapaz de extrair conteúdo para {url}: {e}") return { 'url': url, 'title': title, 'infobox': infobox_data, 'sections': content_sections }
def page_worker(worker_id, links_queue, data_queue):
"""
Processo responsável por processar as páginas e colocar os dados na data_queue.
"""
print(f"[Worker {worker_id}] Iniciando.")
while True:
url = links_queue.get()
if url == "DONE":
links_queue.put("DONE")
break
else:
try:
page_data = parse_page(url)
data_queue.put(page_data)
print(f"[Worker {worker_id}] Página processada: {page_data.get('title')}")
except Exception as e:
print(f"[Worker {worker_id}] Erro ao processar {url}: {e}")
close_driver()
print(f"[Worker {worker_id}] Finalizando e driver fechado.")
########################################################################
########################################################################
def writer_process(filename, data_queue, existing_data):
"""
Processo responsável por escrever dados no arquivo JSON em tempo real.
text- Primeiro reescreve todos os dados antigos (existing_data). - Depois, a cada novo item recebido, escreve no mesmo array JSON. - Ao terminar, fecha corretamente o array. """ print(f"[Writer] Iniciando escrita em {filename}") with open(filename, "w", encoding="utf-8") as f: f.write("[\n") # Inicia o array JSON # 3.1 Escreve dados antigos (se houver) is_first_item = True if existing_data: # Escreve o primeiro item sem vírgula json.dump(existing_data[0], f, ensure_ascii=False, indent=4) # Se houver mais, escreve com vírgula for old_item in existing_data[1:]: f.write(",\n") json.dump(old_item, f, ensure_ascii=False, indent=4) is_first_item = False processed_pages = len(existing_data) # 3.2 Recebe novos dados e escreve incrementalmente while True: data = data_queue.get() if data == "DONE": break if is_first_item: # Primeira inserção "nova" json.dump(data, f, ensure_ascii=False, indent=4) is_first_item = False else: f.write(",\n") json.dump(data, f, ensure_ascii=False, indent=4) f.flush() processed_pages += 1 print(f"[Writer] Dados da página '{data.get('title')}' salvos. Total processadas: {processed_pages}") # Fecha o array JSON f.write("\n]") print(f"[Writer] Escrita concluída no arquivo '{filename}'.")
def main():
# Carrega dados existentes e URLs já processadas
existing_data, processed_urls = load_existing_data(JSON_FILENAME)
text# Cria as filas de comunicação entre processos links_queue = Queue() data_queue = Queue() # Inicia o processo escritor (passando existing_data para ele) writer = Process(target=writer_process, args=(JSON_FILENAME, data_queue, existing_data)) writer.start() # Inicia o processo de extração de links, passando processed_urls para não repetir link_proc = Process(target=get_all_page_links, args=(links_queue, processed_urls)) link_proc.start() # Inicia processos de trabalho para processar as páginas num_workers = 5 workers = [] for i in range(num_workers): p = Process(target=page_worker, args=(i, links_queue, data_queue)) p.start() workers.append(p) # Aguarda a extração de links finalizar link_proc.join() print("Extração de links concluída.") # Aguarda todos os workers finalizarem for w in workers: w.join() print("Todos os workers foram finalizados.") # Sinaliza para o processo de escrita que não há mais dados data_queue.put("DONE") # Aguarda o processo de escrita finalizar antes de encerrar o script writer.join() print("Processo de escrita finalizado.") print("Extração concluída com sucesso.")
if name == "main":
main()
Analisando o problema
O erro:
[Erro] Falha ao ler extracted_data.json: Expecting ',' delimiter: line 22427 column 2 (char 970901).
indica que seu arquivo JSON está corrompido em algum ponto, faltando ou sobrando uma vírgula (,
), ou algum caractere está fora do lugar. Isso normalmente ocorre quando o script é interrompido de forma abrupta (por exemplo, se fechar a janela do terminal ou usar CTRL + C
no momento em que o arquivo ainda está sendo escrito).
O trecho relevante é este, dentro do writer_process
:
pythonwith open(filename, "w", encoding="utf-8") as f: f.write("[\n") # Inicia o array JSON ... # Recebe novos dados e escreve incrementalmente while True: data = data_queue.get() if data == "DONE": break if is_first_item: # Primeira inserção "nova" json.dump(data, f, ensure_ascii=False, indent=4) is_first_item = False else: f.write(",\n") json.dump(data, f, ensure_ascii=False, indent=4) f.flush() ... f.write("\n]")
Quando o processo é encerrado antes de completar a escrita, o arquivo pode:
]
de fechamento do array JSON;Na próxima vez que seu script roda, ele tenta fazer json.load(...)
nesse arquivo e falha, gerando o erro de JSON inválido. Daí o próprio script exibe que teve erro e “reinicia em branco”.
extracted_data.json
Se você já tem muitos dados e não quer perdê-los, a primeira medida é tentar corrigir manualmente o arquivo.
extracted_data.json
em um editor de texto que exiba o número de linha.line 22427 column 2
).}
sobrando ou se o arquivo não fechou com ]
.Se não der erro, significa que você conseguiu consertar.pythonimport json with open("extracted_data.json", encoding="utf-8") as f: data = json.load(f) print("JSON lido com sucesso!")
Essa abordagem resolve o problema de “perder” dados, pois você conserta o arquivo já existente e evita o reset.
Mesmo corrigindo o arquivo atual, o problema pode voltar a ocorrer se o script for interrompido abruptamente. Para mitigar isso:
Escrita em arquivo temporário
Uma prática comum é escrever tudo num arquivo temporário e, no final, renomear para extracted_data.json
. Assim, se houver interrupção no meio do processo, você não corrompe o arquivo principal. Exemplo:
pythonimport os import json from tempfile import NamedTemporaryFile def writer_process(filename, data_queue, existing_data): temp_filename = filename + ".tmp" with open(temp_filename, "w", encoding="utf-8") as f: f.write("[\n") # ... mesma lógica de escrita ... f.write("\n]") # Se chegou até aqui, a escrita foi concluída com sucesso, então renomeia os.replace(temp_filename, filename)
Dessa forma, só substituímos o extracted_data.json
depois de escrever e fechar com sucesso o temporário. Se houver interrupção, o arquivo antigo permanece íntegro.
Escrita “bufferizada” ao final
Outra possibilidade, se a quantidade de dados não for gigante, é manter tudo em memória e escrever apenas quando terminar. Assim:
pythondef writer_process(filename, data_queue, existing_data): all_data = existing_data[:] # copia while True: data = data_queue.get() if data == "DONE": break all_data.append(data) # Agora escreve de uma vez só with open(filename, "w", encoding="utf-8") as f: json.dump(all_data, f, ensure_ascii=False, indent=4)
Dessa forma, você só faz uma escrita final (minimizando chance de corromper). Porém, se você coleta milhares ou milhões de itens, pode ficar muito grande em memória.
Tratamento de sinal (SIGINT, SIGTERM)
Se quiser ser mais sofisticado, pode capturar sinais (por exemplo SIGINT de CTRL + C
) e tratar para fechar corretamente o arquivo, garantindo que o f.write("\n]")
seja executado. Algo como:
pythonimport signal import sys def writer_process(filename, data_queue, existing_data): def handle_signal(signum, frame): print(f"[Writer] Recebido sinal {signum}. Fechando array JSON corretamente...") data_queue.put("DONE") signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) with open(filename, "w", encoding="utf-8") as f: ... while True: data = data_queue.get() if data == "DONE": break ... f.write("\n]") print("[Writer] Finalizado com segurança!") sys.exit(0)
Assim, mesmo que o usuário pressione CTRL + C
, o processo writer tenta escrever o fechamento do array.
Se você quer que o script seja resiliente e consiga aproveitar parte do arquivo corrompido:
json
nativo, mas existem bibliotecas ou gambiarras para ignorar o final quebrado.Isso, porém, complica bastante o código. Para a maioria dos casos, é mais simples evitar interromper bruscamente ou usar o esquema de arquivo temporário.
os.replace
no final, para que o JSON principal não seja corrompido em caso de parada inesperada.Com isso, você evita (ou minimiza muito) o erro de JSON quebrado. E, se ocorrer, consegue retomar sem perder dados.
nao gostei de nenhuma das soluções. encontre outra melhor.
Uma abordagem mais resiliente, prática e simples de implementar (superando o esquema de “arquivo temporário” e o “fechamento manual do array JSON”) é usar “NDJSON” (newline-delimited JSON), também conhecido como “JSON lines”.
Em vez de manter um array gigante no arquivo extracted_data.json
, cada objeto fica numa linha separada, com a seguinte forma:
text{"url": "...", "title": "...", "infobox": {...}, "sections": {...}} {"url": "...", "title": "...", "infobox": {...}, "sections": {...}} {"url": "...", "title": "...", "infobox": {...}, "sections": {...}} ...
Ou seja, não tem [
no início, ]
no final, nem vírgulas entre os objetos. Cada linha é um JSON completo e independente.
json.loads(linha)
. Se alguma linha estiver corrompida, você pode ignorá-la ou tentar corrigi-la, mas ainda assim lê todas as demais."a"
), gravar uma linha (cada item que você processa), e fechar. Não é preciso regravar um array inteiro.Você pode simplificar bastante o seu writer_process
. Em vez de abrir o arquivo em modo "w"
e ficar se preocupando em iniciar [
e fechar ]
, faça algo assim:
pythonimport json import os NDJSON_FILENAME = "extracted_data.json" def writer_process_ndjson(filename, data_queue, existing_data): """ Processo responsável por escrever dados no formato NDJSON (um objeto JSON por linha). """ print(f"[Writer] Iniciando escrita NDJSON em {filename}") # 1) Escreve dados antigos (se quiser reaproveitar) # Ou, se quiser “aproveitar” os existentes, basta pular esta parte: # - Você pode já ter esse arquivo no formato NDJSON. # - Ou converter de array JSON pra NDJSON uma vez e pronto. # Se "existing_data" for um array com objetos, você pode converter e anotar # ATENÇÃO: Só faça isso se for a *primeira* vez que cria o NDJSON, # senão pode duplicar dados! if existing_data: with open(filename, "w", encoding="utf-8") as f: for item in existing_data: json_line = json.dumps(item, ensure_ascii=False) f.write(json_line + "\n") else: # Se não tem dados anteriores, só garanta que o arquivo existe # e não está corrompido. Abre com "a" para não apagar nada já escrito. open(filename, "a", encoding="utf-8").close() # 2) Agora vamos ficar lendo a fila e escrevendo um objeto por linha # em modo append. with open(filename, "a", encoding="utf-8") as f: while True: data = data_queue.get() if data == "DONE": break json_line = json.dumps(data, ensure_ascii=False) f.write(json_line + "\n") f.flush() # escreve imediatamente print(f"[Writer] Dados da página '{data.get('title')}' salvos.") print(f"[Writer] Escrita concluída no arquivo NDJSON '{filename}'.")
Supondo que você tenha esse arquivo extracted_data.json
com cada linha sendo um JSON:
pythonimport json def load_ndjson(filename): data_list = [] with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) data_list.append(obj) except json.JSONDecodeError: # Se uma linha estiver corrompida, você pode ignorar # ou tentar consertar. Aqui, só ignoramos. pass return data_list
Isso te devolve uma lista de objetos que estavam no arquivo, sem se preocupar se, no final, tinha ou não ]
, vírgula etc.
main()
Como agora não precisamos mais “reabrir” o array JSON existente, a função que carrega dados existentes não precisa criar um processed_urls
usando a leitura de json.load(...)
de uma array. Em vez disso, você só percorre o NDJSON linha a linha, monta a lista e extrai os url
s.
Exemplo de load_existing_data_ndjson
:
pythondef load_existing_data_ndjson(filename): """ Lê o NDJSON (um JSON por linha) e retorna (existing_data, processed_urls). Se não existir, retorna listas/conjuntos vazios. """ if not os.path.exists(filename): return [], set() existing_data = [] processed_urls = set() with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) existing_data.append(obj) if "url" in obj: processed_urls.add(obj["url"]) except json.JSONDecodeError: # Ignora linha corrompida pass print(f"[Info] {len(existing_data)} itens existentes carregados de '{filename}'.") return existing_data, processed_urls
Depois, no main()
, em vez de writer = Process(target=writer_process, ...)
, chame:
pythonwriter = Process(target=writer_process_ndjson, args=(JSON_FILENAME, data_queue, existing_data))
E pronto. Todo o resto (producer/consumer, fila de links, etc.) permanece igual. Você só muda a forma de escrita/leitura.
json_line = json.dumps(data, ...)
, não tem problema—na pior das hipóteses você perde somente o objeto que estava sendo gravado no momento, e não corrompe nada do que veio antes.Expecting ',' delimiter
: agora você não tem colchetes nem vírgulas para manter a sintaxe de array. É sempre um objeto por linha. Menos chance de quebrar.Se você “não gostou” de ter que arrumar manualmente, usar arquivo temporário ou capturar sinais, a melhor solução para gravar incrementalmente JSON de modo robusto é usar NDJSON. Ele elimina 99% dos problemas de corromper arquivos JSON por não precisar de delimitadores de array.
No final, se você precisar de um array formal, pode converter (uma única vez) chamando load_ndjson(...)
e depois json.dump(...)
para outro arquivo. Entretanto, no dia a dia, NDJSON é totalmente utilizável por bibliotecas e por scripts Python, e de fato é a forma mais tranquila de escrever dados JSON em alto volume sem corromper o arquivo a cada interrupção.
perfeito gostei. integre isso no codigo completo por favor.
A seguir está uma versão completa do seu código, adaptada para usar NDJSON (newline-delimited JSON) em vez de um único array JSON. Assim, cada objeto (página extraída) será gravado em uma linha individual no arquivo extracted_data.json
. Isso elimina a necessidade de lidar com colchetes ([
e ]
) ou vírgulas entre itens e evita corromper o arquivo caso o script seja interrompido bruscamente.
Formato NDJSON:
extracted_data.json
.[
ou ]
, nem vírgulas entre objetos.Carregamento de dados existentes:
load_existing_data_ndjson
lê o arquivo linha a linha, usando json.loads
.existing_data
(lista dos objetos válidos) e processed_urls
(conjunto com as URLs já processadas).Processo de escrita (writer_process_ndjson
):
"a"
) e, para cada novo objeto recebido na fila, escreve uma nova linha JSON.existing_data
no início (porque supostamente eles já estão no arquivo). Caso estivesse migrando de um formato anterior, poderia converter uma única vez, mas a cada execução não precisa reescrever.Resiliência:
pythonimport json from urllib.parse import urljoin import time import undetected_chromedriver as uc from selenium.common.exceptions import WebDriverException, NoSuchElementException from selenium.webdriver.common.by import By import os from bs4 import BeautifulSoup # Biblioteca de multiprocessing from multiprocessing import Process, Queue, current_process from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # Nome do arquivo NDJSON (um JSON por linha) JSON_FILENAME = "extracted_data.json" # Cada processo terá seu próprio driver (variável global por processo) driver = None ######################################################################## # 1. Ler NDJSON existente (caso exista), guardar dados e URLs processadas ######################################################################## def load_existing_data_ndjson(filename): """ Lê o arquivo NDJSON (um objeto JSON por linha) e retorna (existing_data, processed_urls). - existing_data: lista dos itens já presentes no NDJSON. - processed_urls: set com as URLs já processadas. Se não existir ou estiver vazio, retorna listas/sets vazios. Se encontrar linhas corrompidas, ignora apenas essas linhas. """ if not os.path.exists(filename): return [], set() existing_data = [] processed_urls = set() with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) existing_data.append(obj) if "url" in obj: processed_urls.add(obj["url"]) except json.JSONDecodeError: # Se a linha estiver corrompida, apenas ignoramos pass print(f"[Info] {len(existing_data)} itens existentes carregados de '{filename}'.") return existing_data, processed_urls ######################################################################## # 2. Processo de escrita usando NDJSON ######################################################################## def writer_process_ndjson(filename, data_queue, existing_data): """ Processo responsável por escrever cada item como uma linha JSON independente (NDJSON). - Se o arquivo não existir, cria vazio. - Abre em modo 'append' para não sobrescrever linhas já existentes. - Se você quiser migrar de um formato antigo, poderia converter `existing_data` e escrever aqui, mas tome cuidado para não duplicar linhas cada vez que rodar. """ print(f"[Writer] Iniciando escrita NDJSON em {filename}") # Se for a *primeira vez* do zero e quiser "injectar" existing_data no arquivo, # tome cuidado para não duplicar caso o arquivo já contenha dados. Exemplo: # - Se você estiver migrando do formato array e o arquivo estiver vazio, pode querer gravar isso: # with open(filename, "w", encoding="utf-8") as f: # for item in existing_data: # f.write(json.dumps(item, ensure_ascii=False) + "\n") # # - Se estiver apenas rodando várias vezes (e o arquivo já contém dados), geralmente # não faça nada aqui para não duplicar. Vamos supor que os dados antigos já estão lá. # Aqui, só nos certificamos de que o arquivo existe (se não, cria) open(filename, "a", encoding="utf-8").close() # Agora, abrimos em modo APPEND para escrever as novas páginas with open(filename, "a", encoding="utf-8") as f: while True: data = data_queue.get() if data == "DONE": break # Cada item convertido em string JSON e escrito numa linha json_line = json.dumps(data, ensure_ascii=False) f.write(json_line + "\n") f.flush() # força gravação imediata print(f"[Writer] Dados da página '{data.get('title')}' salvos.") print(f"[Writer] Escrita concluída no arquivo NDJSON '{filename}'.") ######################################################################## # Selenium / Chromedriver ######################################################################## def get_driver(): """ Inicializa o `undetected_chromedriver` para cada processo se ainda não houver um driver, com configurações otimizadas para carregamento mais rápido. """ global driver if driver is None: options = uc.ChromeOptions() # Estratégia de carregamento 'eager' options.page_load_strategy = 'eager' options.add_argument('--headless=new') options.add_argument('window-size=1920,1080') options.add_argument("--disable-gpu") options.add_argument('--disable-extensions') options.add_argument('--ignore-certificate-errors') # Desabilita imagens no Chrome para acelerar o carregamento chrome_prefs = {} options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} chrome_prefs["profile.managed_default_content_settings.images"] = 2 driver_kwargs = {'timeout': 600} driver = uc.Chrome(options=options, use_subprocess=True, **driver_kwargs) return driver def close_driver(): """ Fecha o driver Selenium para o processo atual. """ global driver if driver is not None: try: driver.quit() except: pass driver = None ######################################################################## # Lógica de extração de páginas ######################################################################## def fetch_page_selenium(url, retries=3, backoff_factor=1.2): """ Usa o Selenium para carregar a URL e retorna o driver, com algumas tentativas e backoff exponencial leve. """ drv = get_driver() timeout = 60 drv.set_page_load_timeout(timeout) for attempt in range(retries): try: print(f"[{current_process().name}] Tentativa {attempt+1} de acessar {url}") drv.get(url) # Aguarda apenas a presença do contêiner principal (mw-parser-output) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, "mw-parser-output")) ) return drv except WebDriverException as e: print(f"[{current_process().name}] Erro ao acessar {url}: {e}") if attempt < retries - 1: wait_time = (backoff_factor ** attempt) print(f"[{current_process().name}] Tentando novamente em {wait_time:.1f} segundos...") time.sleep(wait_time) else: print(f"[{current_process().name}] Falha ao acessar {url} após {retries} tentativas.") raise def parse_page(url): """ Extrai informações detalhadas e estruturadas de uma página usando Selenium e BeautifulSoup. """ print(f"[{current_process().name}] Processando página: {url}") drv = fetch_page_selenium(url) html = drv.page_source soup = BeautifulSoup(html, 'html.parser') # Extrair o título try: title_elem = soup.find(id='firstHeading') title = title_elem.text.strip() if title_elem else '' except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o título para {url}: {e}") title = '' # Extrair dados da infobox infobox_data = {} try: infobox = soup.find('table', {'class': lambda x: x and 'infobox' in x}) if infobox: for tr in infobox.find_all('tr'): if tr.th and tr.td: key = tr.th.get_text(strip=True) value = tr.td.get_text(strip=True) infobox_data[key] = value except Exception as e: print(f"[{current_process().name}] Incapaz de extrair infobox para {url}: {e}") # Extrair seções e conteúdo content_sections = {} try: content_div = soup.find('div', {'class': 'mw-parser-output'}) if content_div: current_section = '' for element in content_div.children: if element.name in ['h2', 'h3', 'h4']: section_title = element.get_text(strip=True).replace('[editar]', '').strip() current_section = section_title content_sections[current_section] = '' elif element.name == 'p': text = element.get_text(strip=True) if current_section: content_sections[current_section] += text + '\n' else: # Conteúdo antes de qualquer seção if 'Introduction' in content_sections: content_sections['Introduction'] += text + '\n' else: content_sections['Introduction'] = text + '\n' elif element.name == 'ul': items = [li.get_text(strip=True) for li in element.find_all('li')] if current_section: content_sections[current_section] += '\n'.join(items) + '\n' elif element.name == 'table': table_data = [] headers = [] rows = element.find_all('tr') header_row = element.find('tr') if header_row: header_cells = header_row.find_all(['th', 'td']) headers = [cell.get_text(strip=True) for cell in header_cells] for row in rows[1:]: cells = row.find_all(['th', 'td']) if len(cells) == len(headers): row_data = { headers[i]: cells[i].get_text(strip=True) for i in range(len(headers)) } table_data.append(row_data) else: row_data = [cell.get_text(strip=True) for cell in cells] table_data.append(row_data) if current_section: content_sections[current_section + ' - Tabela'] = table_data except Exception as e: print(f"[{current_process().name}] Incapaz de extrair conteúdo para {url}: {e}") return { 'url': url, 'title': title, 'infobox': infobox_data, 'sections': content_sections } ######################################################################## # 3. Worker que processa cada página ######################################################################## def page_worker(worker_id, links_queue, data_queue): """ Processo responsável por consumir URLs da links_queue, extrair dados e colocar na data_queue. """ print(f"[Worker {worker_id}] Iniciando.") while True: url = links_queue.get() if url == "DONE": # Recoloca o "DONE" para os próximos workers e sai links_queue.put("DONE") break else: try: page_data = parse_page(url) data_queue.put(page_data) print(f"[Worker {worker_id}] Página processada: {page_data.get('title')}") except Exception as e: print(f"[Worker {worker_id}] Erro ao processar {url}: {e}") close_driver() print(f"[Worker {worker_id}] Finalizando e driver fechado.") ######################################################################## # 4. Processo que extrai todos os links de Special:AllPages ######################################################################## def get_all_page_links(links_queue, processed_urls): """ Extrai todos os links das páginas a partir de 'Special:AllPages' usando Selenium. Coloca os links na links_queue e, ao final, insere "DONE" na fila. Pula URLs já contidas em processed_urls. """ print(f"[{current_process().name}] Iniciando extração de links.") base_url = "https://oldschool.runescape.wiki/" all_pages_url = urljoin(base_url, "w/Special:AllPages") drv = get_driver() try: drv.set_page_load_timeout(30) drv.get(all_pages_url) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) except Exception as e: print(f"[{current_process().name}] Erro inicial ao abrir Special:AllPages: {e}") total_pages_found = 0 while True: print(f"[{current_process().name}] Extraindo links da página: {all_pages_url}") try: WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) content_div = drv.find_element(By.CLASS_NAME, 'mw-allpages-body') a_elements = content_div.find_elements(By.TAG_NAME, 'a') for a in a_elements: href = a.get_attribute('href') # Filtra links: if href.startswith(base_url + 'w/') and not href.startswith(base_url + 'w/Special:'): if href not in processed_urls: links_queue.put(href) total_pages_found += 1 except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o conteúdo da página: {e}") # Tenta achar o link 'Next page' try: next_link = drv.find_element(By.PARTIAL_LINK_TEXT, 'Next page') if next_link: all_pages_url = next_link.get_attribute('href') drv.get(all_pages_url) else: break except NoSuchElementException: print(f"[{current_process().name}] Nenhum link 'Next page' encontrado.") break except Exception as e: print(f"[{current_process().name}] Erro ao encontrar o link 'Next page': {e}") break print(f"[{current_process().name}] Total de novas páginas encontradas: {total_pages_found}") links_queue.put("DONE") close_driver() print(f"[{current_process().name}] Extração de links concluída e driver fechado.") ######################################################################## # 5. Função principal ######################################################################## def main(): # 5.1 Carrega dados existentes (NDJSON) e URLs já processadas existing_data, processed_urls = load_existing_data_ndjson(JSON_FILENAME) # 5.2 Cria as filas de comunicação entre processos links_queue = Queue() data_queue = Queue() # 5.3 Inicia o processo escritor (formato NDJSON) writer = Process(target=writer_process_ndjson, args=(JSON_FILENAME, data_queue, existing_data)) writer.start() # 5.4 Inicia o processo de extração de links, passando as URLs já processadas link_proc = Process(target=get_all_page_links, args=(links_queue, processed_urls)) link_proc.start() # 5.5 Inicia processos de trabalho para processar as páginas num_workers = 5 workers = [] for i in range(num_workers): p = Process(target=page_worker, args=(i, links_queue, data_queue)) p.start() workers.append(p) # 5.6 Aguarda a extração de links finalizar link_proc.join() print("Extração de links concluída.") # 5.7 Aguarda todos os workers finalizarem for w in workers: w.join() print("Todos os workers foram finalizados.") # 5.8 Sinaliza para o processo escritor que não há mais dados data_queue.put("DONE") # 5.9 Aguarda o processo de escrita finalizar antes de encerrar o script writer.join() print("Processo de escrita finalizado.") print("Extração concluída com sucesso.") # Execução if __name__ == "__main__": main()
[
e ]
no arquivo. Cada página gerada é gravada como uma linha JSON completa.extracted_data.json
em formato de array, faça a conversão uma única vez para NDJSON (salvando item a item em linhas separadas). Depois, não regrave existing_data
toda vez (ou duplicará).pythondef read_ndjson(filename): objs = [] with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: objs.append(json.loads(line)) except json.JSONDecodeError: pass # ignorar linhas quebradas return objs
Esse é, em geral, o modo mais robusto de gravar dados JSON incrementais sem correr o risco de corromper tudo caso seja interrompido.
como fazer download de um site inteiro rapidamente? download de todas paginas url presentes naquele dominio.
Para baixar (fazer mirror) de um site inteiro – ou o mais próximo disso – existem algumas opções consagradas. A maneira mais rápida e simples geralmente não é por scripts Python, mas sim usando ferramentas especializadas, como Wget ou HTTrack. Abaixo explico as principais formas.
O Wget é uma ferramenta clássica em sistemas Unix/Linux (também há versões para Windows). Ele consegue baixar recursivamente as páginas de um site, seguindo os links internos, e gravar tudo em pastas locais.
Exemplo de comando para “espelhar” um site:
bashwget \ --mirror \ --convert-links \ --adjust-extension \ --page-requisites \ --no-parent \ http://exemplo.com/
--mirror
-r -N -l inf --no-remove-listing
).--convert-links
--adjust-extension
.html
(às vezes o servidor não tem extensão, e isso facilita abrir).--page-requisites
--no-parent
Depois que Wget concluir, você terá uma pasta local com todos os arquivos do site (ou quase todos). Basta abrir o arquivo index.html
principal no navegador para navegar offline.
Observação: se o site for muito grande, isso pode levar bastante tempo e muito espaço. Ajuste ou interrompa caso note que está baixando milhares de páginas.
O HTTrack é outra ferramenta clássica, disponível no Windows e em outras plataformas, que automatiza esse processo e oferece interface gráfica. Você pode escolher:
A vantagem do HTTrack é ter GUI (fácil para quem não quer linha de comando) e opções avançadas de filtragem (incluir/excluir certos caminhos, tipos de arquivo, etc.).
Se você preferir um código Python para ter mais controle – e não só baixar páginas, mas também processá-las, extrair dados etc. – pode usar Scrapy. Ele é um framework de web crawling. Entretanto, não é a forma “mais rápida” para baixar tudo mecanicamente: Scrapy é mais voltado a extração de dados estruturada.
Um script básico Scrapy (em pseudo-código) seria:
bashpip install scrapy scrapy startproject meu_projeto cd meu_projeto scrapy genspider site_inteiro exemplo.com
No arquivo gerado em spiders/site_inteiro.py
, você poderia ter algo como:
pythonimport scrapy class SiteInteiroSpider(scrapy.Spider): name = 'site_inteiro' allowed_domains = ['exemplo.com'] start_urls = ['http://exemplo.com/'] def parse(self, response): # Salva a página HTML em disco pagina_nome = response.url.split("/")[-1] or "index.html" with open(pagina_nome, "wb") as f: f.write(response.body) # Segue todos os links internos for link in response.css('a::attr(href)'): url = link.get() if url.startswith('http://exemplo.com'): yield scrapy.Request(url, callback=self.parse)
Depois, você roda:
bashscrapy crawl site_inteiro
Isso não é tão imediato quanto Wget/HTTrack, e precisaria de ajustes (por exemplo, criar subpastas, ajustar links relativos, baixar imagens, etc.). Mas é útil se você pretende customizar a maneira de salvar ou processar cada página.
Sites dinâmicos (com JavaScript pesado, single-page apps) podem não ser baixados corretamente apenas com Wget/HTTrack, pois esses programas não executam JavaScript complexo. Se o site for baseado em AJAX, React, Vue, Angular etc., você pode ter páginas incompletas. Neste caso, seria necessário algo como Selenium ou Playwright com abordagem de “navegador headless” que renderiza JS – mas aí a tarefa fica muito mais pesada e demorada.
Sites gigantes podem ter milhares ou milhões de páginas. Verifique o robots.txt e a política do site (termos de uso) para ver se é permitido “espelhar” o site. Caso contrário, seu IP pode ser bloqueado ou você pode até infringir termos legais.
Velocidade de download:
Uso offline: depois de baixar, você pode abrir localmente. Se o site for muito grande, espere muitas pastas e subpastas.
Em boa parte dos casos, a maneira mais prática é realmente:
bashwget --mirror --convert-links --adjust-extension --page-requisites --no-parent http://exemplo.com/
Isso vai produzir uma cópia navegável offline (HTML e arquivos relacionados) na sua máquina, de forma rápida e fácil.