C:\pythonapigenaiprojeto>python gemini_crawler.py ...

Criado em: 10 de fevereiro de 2025

Respondido usando O1 por Chat01

Pergunta

C:\pythonapigenaiprojeto>python gemini_crawler.py [Erro] Falha ao ler extracted_data.json: Expecting ',' delimiter: line 22427 column 2 (char 970901). Reiniciando em branco. [Writer] Iniciando escrita em extracted_data.json [Worker 2] Iniciando. [Worker 0] Iniciando. [Worker 4] Iniciando. [Worker 1] Iniciando. [Worker 3] Iniciando. [Process-2] Iniciando extração de links. esse é o codigo completo do projeto >>> import json
from urllib.parse import urljoin
import time
import undetected_chromedriver as uc
from selenium.common.exceptions import WebDriverException, NoSuchElementException
from selenium.webdriver.common.by import By
import os
from bs4 import BeautifulSoup

Biblioteca de multiprocessing

from multiprocessing import Process, Queue, current_process
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

JSON_FILENAME = "extracted_data.json"

Cada processo terá seu próprio driver (variável global por processo)

driver = None

########################################################################

1. Ler o JSON existente (caso exista), guardar dados e URLs processadas

########################################################################
def load_existing_data(filename):
"""
Lê o arquivo JSON (se existir) e retorna (existing_data, processed_urls).
existing_data: lista dos itens já presentes no JSON.
processed_urls: set com todas as URLs já processadas.
Se não existir ou estiver vazio/ilegível, retorna listas/set vazios.
"""
if not os.path.exists(filename):
return [], set()

text
try: with open(filename, "r", encoding="utf-8") as f: existing_data = json.load(f) # Espera-se um array de objetos if not isinstance(existing_data, list): print(f"[Aviso] Arquivo {filename} existe, mas não é uma lista. Reiniciando em branco.") return [], set() except Exception as e: print(f"[Erro] Falha ao ler {filename}: {e}. Reiniciando em branco.") return [], set() # Extrai URLs já processadas processed_urls = set() for item in existing_data: url = item.get("url") if url: processed_urls.add(url) print(f"[Info] {len(existing_data)} itens existentes carregados de '{filename}'.") return existing_data, processed_urls

def get_driver():
"""
Inicializa o undetected_chromedriver para cada processo se ainda não houver um driver,
com configurações otimizadas para carregamento mais rápido.
"""
global driver
if driver is None:
options = uc.ChromeOptions()

text
# Estratégia de carregamento 'eager' (não espera todos os recursos como imagens) options.page_load_strategy = 'eager' # Configurações gerais do driver options.add_argument('--headless=new') options.add_argument('window-size=1920,1080') options.add_argument("--disable-gpu") options.add_argument('--disable-extensions') # Desativa extensões options.add_argument('--ignore-certificate-errors') # Se enfrentar problemas de SSL # Desabilita imagens no Chrome para acelerar o carregamento chrome_prefs = {} options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} chrome_prefs["profile.managed_default_content_settings.images"] = 2 # Inicializa o driver com as opções, use_subprocess=True e timeout aumentado driver_kwargs = {'timeout': 600} driver = uc.Chrome(options=options, use_subprocess=True, **driver_kwargs) return driver

def close_driver():
"""
Fecha o driver Selenium para o processo atual.
"""
global driver
if driver is not None:
try:
driver.quit()
except:
pass
driver = None

def fetch_page_selenium(url, retries=3, backoff_factor=1.2):
"""
Usa o Selenium para carregar a URL e retorna o driver.
- pageLoadStrategy='eager' permite que retorne sem esperar todo carregamento.
- Verificamos a presença de 'mw-parser-output' (conteúdo principal) para garantir
que já podemos extrair algo.
"""
drv = get_driver()

text
timeout = 60 drv.set_page_load_timeout(timeout) for attempt in range(retries): try: print(f"[{current_process().name}] Tentativa {attempt+1} de acessar {url}") drv.get(url) # Aguarda apenas a presença do contêiner principal (mw-parser-output) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, "mw-parser-output")) ) return drv except WebDriverException as e: print(f"[{current_process().name}] Erro ao acessar {url}: {e}") if attempt < retries - 1: wait_time = (backoff_factor ** attempt) print(f"[{current_process().name}] Tentando novamente em {wait_time:.1f} segundos...") time.sleep(wait_time) else: print(f"[{current_process().name}] Falha ao acessar {url} após {retries} tentativas.") raise

########################################################################

2. Ajuste para evitar links que já foram processados

########################################################################
def get_all_page_links(links_queue, processed_urls):
"""
Extrai todos os links das páginas a partir da página 'Special:AllPages' usando Selenium.
Coloca os links na links_queue à medida que são encontrados.
Ao final, insere "DONE" na fila para indicar fim.
Pula URLs já contidas em processed_urls.
"""
print(f"[{current_process().name}] Iniciando extração de links.")

text
base_url = "https://oldschool.runescape.wiki/" all_pages_url = urljoin(base_url, "w/Special:AllPages") drv = get_driver() try: drv.set_page_load_timeout(30) drv.get(all_pages_url) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) except Exception as e: print(f"[{current_process().name}] Erro inicial ao abrir Special:AllPages: {e}") total_pages_found = 0 while True: print(f"[{current_process().name}] Extraindo links da página: {all_pages_url}") try: WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) content_div = drv.find_element(By.CLASS_NAME, 'mw-allpages-body') a_elements = content_div.find_elements(By.TAG_NAME, 'a') for a in a_elements: href = a.get_attribute('href') if href.startswith(base_url + 'w/') and not href.startswith(base_url + 'w/Special:'): # Verifica se já processada if href not in processed_urls: links_queue.put(href) total_pages_found += 1 except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o conteúdo da página: {e}") # Localiza o link 'Next page' try: next_link = drv.find_element(By.PARTIAL_LINK_TEXT, 'Next page') if next_link: all_pages_url = next_link.get_attribute('href') drv.get(all_pages_url) else: break # Nenhum link 'Next page' encontrado except NoSuchElementException: print(f"[{current_process().name}] Nenhum link 'Next page' encontrado.") break except Exception as e: print(f"[{current_process().name}] Erro ao encontrar o link 'Next page': {e}") break print(f"[{current_process().name}] Total de novas páginas encontradas: {total_pages_found}") # Sinaliza que não há mais links links_queue.put("DONE") close_driver() print(f"[{current_process().name}] Extração de links concluída e driver fechado.")

def parse_page(url):
"""
Extrai informações detalhadas e estruturadas de uma página usando Selenium e BeautifulSoup.
Usa tempo de espera menor, apoiado em 'eager'.
"""
print(f"[{current_process().name}] Processando página: {url}")
drv = fetch_page_selenium(url)

text
html = drv.page_source soup = BeautifulSoup(html, 'html.parser') # Extrair o título try: title_elem = soup.find(id='firstHeading') title = title_elem.text.strip() if title_elem else '' except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o título para {url}: {e}") title = '' # Extrair dados da infobox infobox_data = {} try: infobox = soup.find('table', {'class': lambda x: x and 'infobox' in x}) if infobox: for tr in infobox.find_all('tr'): if tr.th and tr.td: key = tr.th.get_text(strip=True) value = tr.td.get_text(strip=True) infobox_data[key] = value except Exception as e: print(f"[{current_process().name}] Incapaz de extrair infobox para {url}: {e}") # Extrair seções e conteúdo content_sections = {} try: content_div = soup.find('div', {'class': 'mw-parser-output'}) if content_div: current_section = '' for element in content_div.children: if element.name in ['h2', 'h3', 'h4']: section_title = element.get_text(strip=True).replace('[editar]', '').strip() current_section = section_title content_sections[current_section] = '' elif element.name == 'p': text = element.get_text(strip=True) if current_section: content_sections[current_section] += text + '\n' else: # Conteúdo antes de qualquer seção if 'Introduction' in content_sections: content_sections['Introduction'] += text + '\n' else: content_sections['Introduction'] = text + '\n' elif element.name == 'ul': items = [li.get_text(strip=True) for li in element.find_all('li')] if current_section: content_sections[current_section] += '\n'.join(items) + '\n' elif element.name == 'table': table_data = [] headers = [] rows = element.find_all('tr') header_row = element.find('tr') if header_row: header_cells = header_row.find_all(['th', 'td']) headers = [cell.get_text(strip=True) for cell in header_cells] for row in rows[1:]: cells = row.find_all(['th', 'td']) if len(cells) == len(headers): row_data = { headers[i]: cells[i].get_text(strip=True) for i in range(len(headers)) } table_data.append(row_data) else: row_data = [cell.get_text(strip=True) for cell in cells] table_data.append(row_data) if current_section: content_sections[current_section + ' - Tabela'] = table_data except Exception as e: print(f"[{current_process().name}] Incapaz de extrair conteúdo para {url}: {e}") return { 'url': url, 'title': title, 'infobox': infobox_data, 'sections': content_sections }

def page_worker(worker_id, links_queue, data_queue):
"""
Processo responsável por processar as páginas e colocar os dados na data_queue.
"""
print(f"[Worker {worker_id}] Iniciando.")
while True:
url = links_queue.get()
if url == "DONE":
links_queue.put("DONE")
break
else:
try:
page_data = parse_page(url)
data_queue.put(page_data)
print(f"[Worker {worker_id}] Página processada: {page_data.get('title')}")
except Exception as e:
print(f"[Worker {worker_id}] Erro ao processar {url}: {e}")
close_driver()
print(f"[Worker {worker_id}] Finalizando e driver fechado.")

########################################################################

3. Manter dados existentes e escrever incrementando o mesmo arquivo

########################################################################
def writer_process(filename, data_queue, existing_data):
"""
Processo responsável por escrever dados no arquivo JSON em tempo real.

text
- Primeiro reescreve todos os dados antigos (existing_data). - Depois, a cada novo item recebido, escreve no mesmo array JSON. - Ao terminar, fecha corretamente o array. """ print(f"[Writer] Iniciando escrita em {filename}") with open(filename, "w", encoding="utf-8") as f: f.write("[\n") # Inicia o array JSON # 3.1 Escreve dados antigos (se houver) is_first_item = True if existing_data: # Escreve o primeiro item sem vírgula json.dump(existing_data[0], f, ensure_ascii=False, indent=4) # Se houver mais, escreve com vírgula for old_item in existing_data[1:]: f.write(",\n") json.dump(old_item, f, ensure_ascii=False, indent=4) is_first_item = False processed_pages = len(existing_data) # 3.2 Recebe novos dados e escreve incrementalmente while True: data = data_queue.get() if data == "DONE": break if is_first_item: # Primeira inserção "nova" json.dump(data, f, ensure_ascii=False, indent=4) is_first_item = False else: f.write(",\n") json.dump(data, f, ensure_ascii=False, indent=4) f.flush() processed_pages += 1 print(f"[Writer] Dados da página '{data.get('title')}' salvos. Total processadas: {processed_pages}") # Fecha o array JSON f.write("\n]") print(f"[Writer] Escrita concluída no arquivo '{filename}'.")

def main():
# Carrega dados existentes e URLs já processadas
existing_data, processed_urls = load_existing_data(JSON_FILENAME)

text
# Cria as filas de comunicação entre processos links_queue = Queue() data_queue = Queue() # Inicia o processo escritor (passando existing_data para ele) writer = Process(target=writer_process, args=(JSON_FILENAME, data_queue, existing_data)) writer.start() # Inicia o processo de extração de links, passando processed_urls para não repetir link_proc = Process(target=get_all_page_links, args=(links_queue, processed_urls)) link_proc.start() # Inicia processos de trabalho para processar as páginas num_workers = 5 workers = [] for i in range(num_workers): p = Process(target=page_worker, args=(i, links_queue, data_queue)) p.start() workers.append(p) # Aguarda a extração de links finalizar link_proc.join() print("Extração de links concluída.") # Aguarda todos os workers finalizarem for w in workers: w.join() print("Todos os workers foram finalizados.") # Sinaliza para o processo de escrita que não há mais dados data_queue.put("DONE") # Aguarda o processo de escrita finalizar antes de encerrar o script writer.join() print("Processo de escrita finalizado.") print("Extração concluída com sucesso.")

if name == "main":
main()

Pensamento

Analisando a situação

Estou verificando um erro ao ler um arquivo JSON em Python. Parece que está faltando uma vírgula, causando malformações. Isso sugere um problema de formatação no arquivo.

Lidando com a interrupção

Estou enfrentando problemas ao escrever o JSON devido à força da terminação do script. Uma solução é gravar em um arquivo temporário e renomeá-lo, garantindo a integridade.

Buscando alternativas

Estou pensando em ler arquivos JSON parciais ou ignorar partes incompletas. Outra ideia é gravar em um arquivo temporário e renomeá-lo, garantindo a integridade.

Aprimorando a leitura

Estou trabalhando numa abordagem para ler JSON parcial com try/except. Se falhar, removo a linha incompleta ou conserto. Isso pode melhorar a leitura e registro de arquivos JSON.

Descontaminação de dados

Estou pensando em recuperar dados JSON perdidos ou corrompidos. Isso me leva à ideia de manter uma verificação constante de formatação para evitar essa situação no futuro.

Pensou por 11 segundos

Resposta

Analisando o problema

O erro:

[Erro] Falha ao ler extracted_data.json: Expecting ',' delimiter: line 22427 column 2 (char 970901).

indica que seu arquivo JSON está corrompido em algum ponto, faltando ou sobrando uma vírgula (,), ou algum caractere está fora do lugar. Isso normalmente ocorre quando o script é interrompido de forma abrupta (por exemplo, se fechar a janela do terminal ou usar CTRL + C no momento em que o arquivo ainda está sendo escrito).

O trecho relevante é este, dentro do writer_process:

python
with open(filename, "w", encoding="utf-8") as f: f.write("[\n") # Inicia o array JSON ... # Recebe novos dados e escreve incrementalmente while True: data = data_queue.get() if data == "DONE": break if is_first_item: # Primeira inserção "nova" json.dump(data, f, ensure_ascii=False, indent=4) is_first_item = False else: f.write(",\n") json.dump(data, f, ensure_ascii=False, indent=4) f.flush() ... f.write("\n]")

Quando o processo é encerrado antes de completar a escrita, o arquivo pode:

  1. Ficar sem o ] de fechamento do array JSON;
  2. Faltar a vírgula entre elementos;
  3. Acabar cortado no meio de um objeto.

Na próxima vez que seu script roda, ele tenta fazer json.load(...) nesse arquivo e falha, gerando o erro de JSON inválido. Daí o próprio script exibe que teve erro e “reinicia em branco”.


Como resolver e prevenir

1. Corrigir manualmente o arquivo extracted_data.json

Se você já tem muitos dados e não quer perdê-los, a primeira medida é tentar corrigir manualmente o arquivo.

  1. Abra o extracted_data.json em um editor de texto que exiba o número de linha.
  2. Vá até a linha e coluna indicadas pelo erro (line 22427 column 2).
  3. Verifique se está faltando uma vírgula entre dois objetos, se há alguma chave } sobrando ou se o arquivo não fechou com ].
  4. Corrija a estrutura JSON (coloque vírgula, remova caractere extra, feche colchetes etc.).
  5. Para confirmar que o arquivo agora está válido, você pode rodar um script curto em Python, como:
    python
    import json with open("extracted_data.json", encoding="utf-8") as f: data = json.load(f) print("JSON lido com sucesso!")
    Se não der erro, significa que você conseguiu consertar.

Essa abordagem resolve o problema de “perder” dados, pois você conserta o arquivo já existente e evita o reset.


2. Melhorar a escrita para não corromper o arquivo

Mesmo corrigindo o arquivo atual, o problema pode voltar a ocorrer se o script for interrompido abruptamente. Para mitigar isso:

  1. Escrita em arquivo temporário
    Uma prática comum é escrever tudo num arquivo temporário e, no final, renomear para extracted_data.json. Assim, se houver interrupção no meio do processo, você não corrompe o arquivo principal. Exemplo:

    python
    import os import json from tempfile import NamedTemporaryFile def writer_process(filename, data_queue, existing_data): temp_filename = filename + ".tmp" with open(temp_filename, "w", encoding="utf-8") as f: f.write("[\n") # ... mesma lógica de escrita ... f.write("\n]") # Se chegou até aqui, a escrita foi concluída com sucesso, então renomeia os.replace(temp_filename, filename)

    Dessa forma, só substituímos o extracted_data.json depois de escrever e fechar com sucesso o temporário. Se houver interrupção, o arquivo antigo permanece íntegro.

  2. Escrita “bufferizada” ao final
    Outra possibilidade, se a quantidade de dados não for gigante, é manter tudo em memória e escrever apenas quando terminar. Assim:

    python
    def writer_process(filename, data_queue, existing_data): all_data = existing_data[:] # copia while True: data = data_queue.get() if data == "DONE": break all_data.append(data) # Agora escreve de uma vez só with open(filename, "w", encoding="utf-8") as f: json.dump(all_data, f, ensure_ascii=False, indent=4)

    Dessa forma, você só faz uma escrita final (minimizando chance de corromper). Porém, se você coleta milhares ou milhões de itens, pode ficar muito grande em memória.

  3. Tratamento de sinal (SIGINT, SIGTERM) Se quiser ser mais sofisticado, pode capturar sinais (por exemplo SIGINT de CTRL + C) e tratar para fechar corretamente o arquivo, garantindo que o f.write("\n]") seja executado. Algo como:

    python
    import signal import sys def writer_process(filename, data_queue, existing_data): def handle_signal(signum, frame): print(f"[Writer] Recebido sinal {signum}. Fechando array JSON corretamente...") data_queue.put("DONE") signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) with open(filename, "w", encoding="utf-8") as f: ... while True: data = data_queue.get() if data == "DONE": break ... f.write("\n]") print("[Writer] Finalizado com segurança!") sys.exit(0)

    Assim, mesmo que o usuário pressione CTRL + C, o processo writer tenta escrever o fechamento do array.


3. Permitir que o script “ignore” fim abrupto e conserte sozinho

Se você quer que o script seja resiliente e consiga aproveitar parte do arquivo corrompido:

  • Tentar fazer um “parse parcial”: não é trivial com o json nativo, mas existem bibliotecas ou gambiarras para ignorar o final quebrado.
  • Ler linha a linha e manter um buffer até encontrar erro, descartando o final malformado. Depois reescreve um novo JSON válido.

Isso, porém, complica bastante o código. Para a maioria dos casos, é mais simples evitar interromper bruscamente ou usar o esquema de arquivo temporário.


Resumo das boas práticas

  1. Corrija manualmente o arquivo atual, se quiser aproveitar os dados já coletados (ajustando a linha com problema).
  2. Aplique escrita em arquivo temporário + os.replace no final, para que o JSON principal não seja corrompido em caso de parada inesperada.
  3. Se a interrupção for inevitável (por exemplo, script muito longo e risco de quedas de energia), considere capturar sinais ou armazenar em memória e só escrever no final.

Com isso, você evita (ou minimiza muito) o erro de JSON quebrado. E, se ocorrer, consegue retomar sem perder dados.

Pergunta

nao gostei de nenhuma das soluções. encontre outra melhor.

Pensamento

Considerando opções

Estou pensando em uma abordagem baseada em bloqueio, usando um banco de dados ou jsonlines. Outra ideia é armazenar cada item individualmente em arquivos JSON separados.

Procurando opções

Estou pensando em armazenar dados em formato de JSON delimitado por nova linha, usar um banco de dados robusto ou considerar uma fila em memória para evitar escrituras parciais.

Reparando e recuperando

Estou pensando em uma solução que, ao iniciar, lê JSON parcialmente corrompido e corrige automaticamente. Isso reescreve um array válido quando necessário, similar ao NDJSON usado em logs.

Pensou por 10 segundos

Resposta

Uma abordagem mais resiliente, prática e simples de implementar (superando o esquema de “arquivo temporário” e o “fechamento manual do array JSON”) é usar “NDJSON” (newline-delimited JSON), também conhecido como “JSON lines”.

Em vez de manter um array gigante no arquivo extracted_data.json, cada objeto fica numa linha separada, com a seguinte forma:

text
{"url": "...", "title": "...", "infobox": {...}, "sections": {...}} {"url": "...", "title": "...", "infobox": {...}, "sections": {...}} {"url": "...", "title": "...", "infobox": {...}, "sections": {...}} ...

Ou seja, não tem [ no início, ] no final, nem vírgulas entre os objetos. Cada linha é um JSON completo e independente.

Vantagens do NDJSON

  1. Resiliência a interrupções: se o processo for interrompido durante a gravação de uma linha, você só perde essa última linha. Todas as anteriores continuam sendo JSON válido.
  2. Facilidade de leitura: para ler, basta ir linha a linha, fazendo json.loads(linha). Se alguma linha estiver corrompida, você pode ignorá-la ou tentar corrigi-la, mas ainda assim lê todas as demais.
  3. Acréscimo fácil: para adicionar dados, basta abrir o arquivo em modo append ("a"), gravar uma linha (cada item que você processa), e fechar. Não é preciso regravar um array inteiro.
  4. Concilia bem com logs e fluxos de dados contínuos, pois cada item é independente.

Como implementar NDJSON no seu caso

Você pode simplificar bastante o seu writer_process. Em vez de abrir o arquivo em modo "w" e ficar se preocupando em iniciar [ e fechar ], faça algo assim:

python
import json import os NDJSON_FILENAME = "extracted_data.json" def writer_process_ndjson(filename, data_queue, existing_data): """ Processo responsável por escrever dados no formato NDJSON (um objeto JSON por linha). """ print(f"[Writer] Iniciando escrita NDJSON em {filename}") # 1) Escreve dados antigos (se quiser reaproveitar) # Ou, se quiser “aproveitar” os existentes, basta pular esta parte: # - Você pode já ter esse arquivo no formato NDJSON. # - Ou converter de array JSON pra NDJSON uma vez e pronto. # Se "existing_data" for um array com objetos, você pode converter e anotar # ATENÇÃO: Só faça isso se for a *primeira* vez que cria o NDJSON, # senão pode duplicar dados! if existing_data: with open(filename, "w", encoding="utf-8") as f: for item in existing_data: json_line = json.dumps(item, ensure_ascii=False) f.write(json_line + "\n") else: # Se não tem dados anteriores, só garanta que o arquivo existe # e não está corrompido. Abre com "a" para não apagar nada já escrito. open(filename, "a", encoding="utf-8").close() # 2) Agora vamos ficar lendo a fila e escrevendo um objeto por linha # em modo append. with open(filename, "a", encoding="utf-8") as f: while True: data = data_queue.get() if data == "DONE": break json_line = json.dumps(data, ensure_ascii=False) f.write(json_line + "\n") f.flush() # escreve imediatamente print(f"[Writer] Dados da página '{data.get('title')}' salvos.") print(f"[Writer] Escrita concluída no arquivo NDJSON '{filename}'.")

Como ler esse NDJSON depois?

Supondo que você tenha esse arquivo extracted_data.json com cada linha sendo um JSON:

python
import json def load_ndjson(filename): data_list = [] with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) data_list.append(obj) except json.JSONDecodeError: # Se uma linha estiver corrompida, você pode ignorar # ou tentar consertar. Aqui, só ignoramos. pass return data_list

Isso te devolve uma lista de objetos que estavam no arquivo, sem se preocupar se, no final, tinha ou não ], vírgula etc.


Ajuste no seu main()

Como agora não precisamos mais “reabrir” o array JSON existente, a função que carrega dados existentes não precisa criar um processed_urls usando a leitura de json.load(...) de uma array. Em vez disso, você só percorre o NDJSON linha a linha, monta a lista e extrai os urls.

Exemplo de load_existing_data_ndjson:

python
def load_existing_data_ndjson(filename): """ Lê o NDJSON (um JSON por linha) e retorna (existing_data, processed_urls). Se não existir, retorna listas/conjuntos vazios. """ if not os.path.exists(filename): return [], set() existing_data = [] processed_urls = set() with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) existing_data.append(obj) if "url" in obj: processed_urls.add(obj["url"]) except json.JSONDecodeError: # Ignora linha corrompida pass print(f"[Info] {len(existing_data)} itens existentes carregados de '{filename}'.") return existing_data, processed_urls

Depois, no main(), em vez de writer = Process(target=writer_process, ...), chame:

python
writer = Process(target=writer_process_ndjson, args=(JSON_FILENAME, data_queue, existing_data))

E pronto. Todo o resto (producer/consumer, fila de links, etc.) permanece igual. Você só muda a forma de escrita/leitura.


Por que isso resolve (ou mitiga) seu problema?

  1. Interrupção abrupta: se o script for morto no meio de json_line = json.dumps(data, ...), não tem problema—na pior das hipóteses você perde somente o objeto que estava sendo gravado no momento, e não corrompe nada do que veio antes.
  2. Reinício: quando o script roda novamente, você carrega quantas linhas já estavam válidas (ou seja, uma por objeto). Se a última linha estiver incompleta ou corrompida, você simplesmente ignora.
  3. Sem Expecting ',' delimiter: agora você não tem colchetes nem vírgulas para manter a sintaxe de array. É sempre um objeto por linha. Menos chance de quebrar.
  4. Escalabilidade: NDJSON é bem comum para logs ou coleta incremental de dados, pois você não precisa reescrever tudo para adicionar itens novos.

Conclusão

Se você “não gostou” de ter que arrumar manualmente, usar arquivo temporário ou capturar sinais, a melhor solução para gravar incrementalmente JSON de modo robusto é usar NDJSON. Ele elimina 99% dos problemas de corromper arquivos JSON por não precisar de delimitadores de array.

No final, se você precisar de um array formal, pode converter (uma única vez) chamando load_ndjson(...) e depois json.dump(...) para outro arquivo. Entretanto, no dia a dia, NDJSON é totalmente utilizável por bibliotecas e por scripts Python, e de fato é a forma mais tranquila de escrever dados JSON em alto volume sem corromper o arquivo a cada interrupção.

Pergunta

perfeito gostei. integre isso no codigo completo por favor.

Pensamento

Reorganizando para NDJSON

Estou ajustando o código original para trabalhar com NDJSON, mudando nomes e processos para focar em armazenamento e leitura de NDJSON, mantendo outras funcionalidades intactas.

Ajustando a leitura e gravação
Estou trabalhando nas funções de leitura e escrita para NDJSON, cuidando para evitar duplicação de linhas e mantendo a integridade dos dados existentes.

Guardando este conceito

Estou optando por não reescrever existing_data, evitando duplicatas, e apenas abrindo em modo de anexação, deixando a integração do código mais tranquila e eficaz.

Pensou por 25 segundos

Resposta

A seguir está uma versão completa do seu código, adaptada para usar NDJSON (newline-delimited JSON) em vez de um único array JSON. Assim, cada objeto (página extraída) será gravado em uma linha individual no arquivo extracted_data.json. Isso elimina a necessidade de lidar com colchetes ([ e ]) ou vírgulas entre itens e evita corromper o arquivo caso o script seja interrompido bruscamente.

<br>

Como funciona a nova versão:

  1. Formato NDJSON:

    • Cada objeto é escrito como uma única linha JSON no arquivo extracted_data.json.
    • Não há [ ou ], nem vírgulas entre objetos.
  2. Carregamento de dados existentes:

    • A função load_existing_data_ndjson lê o arquivo linha a linha, usando json.loads.
    • Se alguma linha estiver corrompida, apenas é ignorada (você pode alterar esse comportamento se quiser).
    • Ao final, você tem existing_data (lista dos objetos válidos) e processed_urls (conjunto com as URLs já processadas).
  3. Processo de escrita (writer_process_ndjson):

    • Abre o arquivo em modo append ("a") e, para cada novo objeto recebido na fila, escreve uma nova linha JSON.
    • Se o arquivo não existir, ele será criado vazio.
    • Se você não quiser duplicar dados antigos, basta não regravar existing_data no início (porque supostamente eles já estão no arquivo). Caso estivesse migrando de um formato anterior, poderia converter uma única vez, mas a cada execução não precisa reescrever.
  4. Resiliência:

    • Se o processo for interrompido enquanto escreve uma linha, na pior das hipóteses só essa última linha fica parcial/corrompida, não afetando o resto.
    • Na próxima execução, essas linhas válidas anteriores são reaproveitadas, e você não perde tudo.

python
import json from urllib.parse import urljoin import time import undetected_chromedriver as uc from selenium.common.exceptions import WebDriverException, NoSuchElementException from selenium.webdriver.common.by import By import os from bs4 import BeautifulSoup # Biblioteca de multiprocessing from multiprocessing import Process, Queue, current_process from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # Nome do arquivo NDJSON (um JSON por linha) JSON_FILENAME = "extracted_data.json" # Cada processo terá seu próprio driver (variável global por processo) driver = None ######################################################################## # 1. Ler NDJSON existente (caso exista), guardar dados e URLs processadas ######################################################################## def load_existing_data_ndjson(filename): """ Lê o arquivo NDJSON (um objeto JSON por linha) e retorna (existing_data, processed_urls). - existing_data: lista dos itens já presentes no NDJSON. - processed_urls: set com as URLs já processadas. Se não existir ou estiver vazio, retorna listas/sets vazios. Se encontrar linhas corrompidas, ignora apenas essas linhas. """ if not os.path.exists(filename): return [], set() existing_data = [] processed_urls = set() with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) existing_data.append(obj) if "url" in obj: processed_urls.add(obj["url"]) except json.JSONDecodeError: # Se a linha estiver corrompida, apenas ignoramos pass print(f"[Info] {len(existing_data)} itens existentes carregados de '{filename}'.") return existing_data, processed_urls ######################################################################## # 2. Processo de escrita usando NDJSON ######################################################################## def writer_process_ndjson(filename, data_queue, existing_data): """ Processo responsável por escrever cada item como uma linha JSON independente (NDJSON). - Se o arquivo não existir, cria vazio. - Abre em modo 'append' para não sobrescrever linhas já existentes. - Se você quiser migrar de um formato antigo, poderia converter `existing_data` e escrever aqui, mas tome cuidado para não duplicar linhas cada vez que rodar. """ print(f"[Writer] Iniciando escrita NDJSON em {filename}") # Se for a *primeira vez* do zero e quiser "injectar" existing_data no arquivo, # tome cuidado para não duplicar caso o arquivo já contenha dados. Exemplo: # - Se você estiver migrando do formato array e o arquivo estiver vazio, pode querer gravar isso: # with open(filename, "w", encoding="utf-8") as f: # for item in existing_data: # f.write(json.dumps(item, ensure_ascii=False) + "\n") # # - Se estiver apenas rodando várias vezes (e o arquivo já contém dados), geralmente # não faça nada aqui para não duplicar. Vamos supor que os dados antigos já estão lá. # Aqui, só nos certificamos de que o arquivo existe (se não, cria) open(filename, "a", encoding="utf-8").close() # Agora, abrimos em modo APPEND para escrever as novas páginas with open(filename, "a", encoding="utf-8") as f: while True: data = data_queue.get() if data == "DONE": break # Cada item convertido em string JSON e escrito numa linha json_line = json.dumps(data, ensure_ascii=False) f.write(json_line + "\n") f.flush() # força gravação imediata print(f"[Writer] Dados da página '{data.get('title')}' salvos.") print(f"[Writer] Escrita concluída no arquivo NDJSON '{filename}'.") ######################################################################## # Selenium / Chromedriver ######################################################################## def get_driver(): """ Inicializa o `undetected_chromedriver` para cada processo se ainda não houver um driver, com configurações otimizadas para carregamento mais rápido. """ global driver if driver is None: options = uc.ChromeOptions() # Estratégia de carregamento 'eager' options.page_load_strategy = 'eager' options.add_argument('--headless=new') options.add_argument('window-size=1920,1080') options.add_argument("--disable-gpu") options.add_argument('--disable-extensions') options.add_argument('--ignore-certificate-errors') # Desabilita imagens no Chrome para acelerar o carregamento chrome_prefs = {} options.experimental_options["prefs"] = chrome_prefs chrome_prefs["profile.default_content_settings"] = {"images": 2} chrome_prefs["profile.managed_default_content_settings.images"] = 2 driver_kwargs = {'timeout': 600} driver = uc.Chrome(options=options, use_subprocess=True, **driver_kwargs) return driver def close_driver(): """ Fecha o driver Selenium para o processo atual. """ global driver if driver is not None: try: driver.quit() except: pass driver = None ######################################################################## # Lógica de extração de páginas ######################################################################## def fetch_page_selenium(url, retries=3, backoff_factor=1.2): """ Usa o Selenium para carregar a URL e retorna o driver, com algumas tentativas e backoff exponencial leve. """ drv = get_driver() timeout = 60 drv.set_page_load_timeout(timeout) for attempt in range(retries): try: print(f"[{current_process().name}] Tentativa {attempt+1} de acessar {url}") drv.get(url) # Aguarda apenas a presença do contêiner principal (mw-parser-output) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, "mw-parser-output")) ) return drv except WebDriverException as e: print(f"[{current_process().name}] Erro ao acessar {url}: {e}") if attempt < retries - 1: wait_time = (backoff_factor ** attempt) print(f"[{current_process().name}] Tentando novamente em {wait_time:.1f} segundos...") time.sleep(wait_time) else: print(f"[{current_process().name}] Falha ao acessar {url} após {retries} tentativas.") raise def parse_page(url): """ Extrai informações detalhadas e estruturadas de uma página usando Selenium e BeautifulSoup. """ print(f"[{current_process().name}] Processando página: {url}") drv = fetch_page_selenium(url) html = drv.page_source soup = BeautifulSoup(html, 'html.parser') # Extrair o título try: title_elem = soup.find(id='firstHeading') title = title_elem.text.strip() if title_elem else '' except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o título para {url}: {e}") title = '' # Extrair dados da infobox infobox_data = {} try: infobox = soup.find('table', {'class': lambda x: x and 'infobox' in x}) if infobox: for tr in infobox.find_all('tr'): if tr.th and tr.td: key = tr.th.get_text(strip=True) value = tr.td.get_text(strip=True) infobox_data[key] = value except Exception as e: print(f"[{current_process().name}] Incapaz de extrair infobox para {url}: {e}") # Extrair seções e conteúdo content_sections = {} try: content_div = soup.find('div', {'class': 'mw-parser-output'}) if content_div: current_section = '' for element in content_div.children: if element.name in ['h2', 'h3', 'h4']: section_title = element.get_text(strip=True).replace('[editar]', '').strip() current_section = section_title content_sections[current_section] = '' elif element.name == 'p': text = element.get_text(strip=True) if current_section: content_sections[current_section] += text + '\n' else: # Conteúdo antes de qualquer seção if 'Introduction' in content_sections: content_sections['Introduction'] += text + '\n' else: content_sections['Introduction'] = text + '\n' elif element.name == 'ul': items = [li.get_text(strip=True) for li in element.find_all('li')] if current_section: content_sections[current_section] += '\n'.join(items) + '\n' elif element.name == 'table': table_data = [] headers = [] rows = element.find_all('tr') header_row = element.find('tr') if header_row: header_cells = header_row.find_all(['th', 'td']) headers = [cell.get_text(strip=True) for cell in header_cells] for row in rows[1:]: cells = row.find_all(['th', 'td']) if len(cells) == len(headers): row_data = { headers[i]: cells[i].get_text(strip=True) for i in range(len(headers)) } table_data.append(row_data) else: row_data = [cell.get_text(strip=True) for cell in cells] table_data.append(row_data) if current_section: content_sections[current_section + ' - Tabela'] = table_data except Exception as e: print(f"[{current_process().name}] Incapaz de extrair conteúdo para {url}: {e}") return { 'url': url, 'title': title, 'infobox': infobox_data, 'sections': content_sections } ######################################################################## # 3. Worker que processa cada página ######################################################################## def page_worker(worker_id, links_queue, data_queue): """ Processo responsável por consumir URLs da links_queue, extrair dados e colocar na data_queue. """ print(f"[Worker {worker_id}] Iniciando.") while True: url = links_queue.get() if url == "DONE": # Recoloca o "DONE" para os próximos workers e sai links_queue.put("DONE") break else: try: page_data = parse_page(url) data_queue.put(page_data) print(f"[Worker {worker_id}] Página processada: {page_data.get('title')}") except Exception as e: print(f"[Worker {worker_id}] Erro ao processar {url}: {e}") close_driver() print(f"[Worker {worker_id}] Finalizando e driver fechado.") ######################################################################## # 4. Processo que extrai todos os links de Special:AllPages ######################################################################## def get_all_page_links(links_queue, processed_urls): """ Extrai todos os links das páginas a partir de 'Special:AllPages' usando Selenium. Coloca os links na links_queue e, ao final, insere "DONE" na fila. Pula URLs já contidas em processed_urls. """ print(f"[{current_process().name}] Iniciando extração de links.") base_url = "https://oldschool.runescape.wiki/" all_pages_url = urljoin(base_url, "w/Special:AllPages") drv = get_driver() try: drv.set_page_load_timeout(30) drv.get(all_pages_url) WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) except Exception as e: print(f"[{current_process().name}] Erro inicial ao abrir Special:AllPages: {e}") total_pages_found = 0 while True: print(f"[{current_process().name}] Extraindo links da página: {all_pages_url}") try: WebDriverWait(drv, 5).until( EC.presence_of_element_located((By.CLASS_NAME, 'mw-allpages-body')) ) content_div = drv.find_element(By.CLASS_NAME, 'mw-allpages-body') a_elements = content_div.find_elements(By.TAG_NAME, 'a') for a in a_elements: href = a.get_attribute('href') # Filtra links: if href.startswith(base_url + 'w/') and not href.startswith(base_url + 'w/Special:'): if href not in processed_urls: links_queue.put(href) total_pages_found += 1 except Exception as e: print(f"[{current_process().name}] Incapaz de encontrar o conteúdo da página: {e}") # Tenta achar o link 'Next page' try: next_link = drv.find_element(By.PARTIAL_LINK_TEXT, 'Next page') if next_link: all_pages_url = next_link.get_attribute('href') drv.get(all_pages_url) else: break except NoSuchElementException: print(f"[{current_process().name}] Nenhum link 'Next page' encontrado.") break except Exception as e: print(f"[{current_process().name}] Erro ao encontrar o link 'Next page': {e}") break print(f"[{current_process().name}] Total de novas páginas encontradas: {total_pages_found}") links_queue.put("DONE") close_driver() print(f"[{current_process().name}] Extração de links concluída e driver fechado.") ######################################################################## # 5. Função principal ######################################################################## def main(): # 5.1 Carrega dados existentes (NDJSON) e URLs já processadas existing_data, processed_urls = load_existing_data_ndjson(JSON_FILENAME) # 5.2 Cria as filas de comunicação entre processos links_queue = Queue() data_queue = Queue() # 5.3 Inicia o processo escritor (formato NDJSON) writer = Process(target=writer_process_ndjson, args=(JSON_FILENAME, data_queue, existing_data)) writer.start() # 5.4 Inicia o processo de extração de links, passando as URLs já processadas link_proc = Process(target=get_all_page_links, args=(links_queue, processed_urls)) link_proc.start() # 5.5 Inicia processos de trabalho para processar as páginas num_workers = 5 workers = [] for i in range(num_workers): p = Process(target=page_worker, args=(i, links_queue, data_queue)) p.start() workers.append(p) # 5.6 Aguarda a extração de links finalizar link_proc.join() print("Extração de links concluída.") # 5.7 Aguarda todos os workers finalizarem for w in workers: w.join() print("Todos os workers foram finalizados.") # 5.8 Sinaliza para o processo escritor que não há mais dados data_queue.put("DONE") # 5.9 Aguarda o processo de escrita finalizar antes de encerrar o script writer.join() print("Processo de escrita finalizado.") print("Extração concluída com sucesso.") # Execução if __name__ == "__main__": main()

Observações importantes

  1. Não há mais [ e ] no arquivo. Cada página gerada é gravada como uma linha JSON completa.
  2. Se você já tinha um extracted_data.json em formato de array, faça a conversão uma única vez para NDJSON (salvando item a item em linhas separadas). Depois, não regrave existing_data toda vez (ou duplicará).
  3. Se o processo for interrompido antes de terminar de escrever uma linha, apenas essa linha pode ficar corrompida. As demais continuam válidas.
  4. Para recarregar tudo em Python, basta algo como:
    python
    def read_ndjson(filename): objs = [] with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: objs.append(json.loads(line)) except json.JSONDecodeError: pass # ignorar linhas quebradas return objs

Esse é, em geral, o modo mais robusto de gravar dados JSON incrementais sem correr o risco de corromper tudo caso seja interrompido.

Pergunta

como fazer download de um site inteiro rapidamente? download de todas paginas url presentes naquele dominio.

Pensamento

Explorando opções

Estou listando ferramentas como HTTrack e Wget para baixar sites, destacando o comando wget --mirror. Interessado por uma abordagem Python?

Pensou por 19 segundos

Resposta

Para baixar (fazer mirror) de um site inteiro – ou o mais próximo disso – existem algumas opções consagradas. A maneira mais rápida e simples geralmente não é por scripts Python, mas sim usando ferramentas especializadas, como Wget ou HTTrack. Abaixo explico as principais formas.


1. Usar Wget (linha de comando)

O Wget é uma ferramenta clássica em sistemas Unix/Linux (também há versões para Windows). Ele consegue baixar recursivamente as páginas de um site, seguindo os links internos, e gravar tudo em pastas locais.

Exemplo de comando para “espelhar” um site:

bash
wget \ --mirror \ --convert-links \ --adjust-extension \ --page-requisites \ --no-parent \ http://exemplo.com/

Explicando as opções principais

  • --mirror
    Ativa um conjunto de opções para “espelhar” um site (equivale a -r -N -l inf --no-remove-listing).
  • --convert-links
    Converte os links nos arquivos HTML baixados para apontar para o local, de forma que você possa navegar off-line.
  • --adjust-extension
    Ajusta a extensão dos arquivos HTML para .html (às vezes o servidor não tem extensão, e isso facilita abrir).
  • --page-requisites
    Baixa arquivos “embutidos” na página, como imagens, CSS, JavaScript, etc., necessários para renderizar corretamente.
  • --no-parent
    Evita subir pastas acima do caminho inicial (impede que vá baixando sites “pais” ou subdomínios fora do escopo).

Depois que Wget concluir, você terá uma pasta local com todos os arquivos do site (ou quase todos). Basta abrir o arquivo index.html principal no navegador para navegar offline.

Observação: se o site for muito grande, isso pode levar bastante tempo e muito espaço. Ajuste ou interrompa caso note que está baixando milhares de páginas.


2. Usar HTTrack (com interface gráfica ou linha de comando)

O HTTrack é outra ferramenta clássica, disponível no Windows e em outras plataformas, que automatiza esse processo e oferece interface gráfica. Você pode escolher:

  1. New Project (Novo Projeto)
  2. Digitar a URL inicial do site
  3. Selecionar o nível de profundidade e outras configurações
  4. Clicar em Finish
    Ele vai criar uma cópia local do site, tentando manter a mesma estrutura de links.

A vantagem do HTTrack é ter GUI (fácil para quem não quer linha de comando) e opções avançadas de filtragem (incluir/excluir certos caminhos, tipos de arquivo, etc.).


3. Usar Python (Scrapy, por exemplo)

Se você preferir um código Python para ter mais controle – e não só baixar páginas, mas também processá-las, extrair dados etc. – pode usar Scrapy. Ele é um framework de web crawling. Entretanto, não é a forma “mais rápida” para baixar tudo mecanicamente: Scrapy é mais voltado a extração de dados estruturada.

Um script básico Scrapy (em pseudo-código) seria:

bash
pip install scrapy scrapy startproject meu_projeto cd meu_projeto scrapy genspider site_inteiro exemplo.com

No arquivo gerado em spiders/site_inteiro.py, você poderia ter algo como:

python
import scrapy class SiteInteiroSpider(scrapy.Spider): name = 'site_inteiro' allowed_domains = ['exemplo.com'] start_urls = ['http://exemplo.com/'] def parse(self, response): # Salva a página HTML em disco pagina_nome = response.url.split("/")[-1] or "index.html" with open(pagina_nome, "wb") as f: f.write(response.body) # Segue todos os links internos for link in response.css('a::attr(href)'): url = link.get() if url.startswith('http://exemplo.com'): yield scrapy.Request(url, callback=self.parse)

Depois, você roda:

bash
scrapy crawl site_inteiro

Isso não é tão imediato quanto Wget/HTTrack, e precisaria de ajustes (por exemplo, criar subpastas, ajustar links relativos, baixar imagens, etc.). Mas é útil se você pretende customizar a maneira de salvar ou processar cada página.


4. Observações de performance e limitações

  1. Sites dinâmicos (com JavaScript pesado, single-page apps) podem não ser baixados corretamente apenas com Wget/HTTrack, pois esses programas não executam JavaScript complexo. Se o site for baseado em AJAX, React, Vue, Angular etc., você pode ter páginas incompletas. Neste caso, seria necessário algo como Selenium ou Playwright com abordagem de “navegador headless” que renderiza JS – mas aí a tarefa fica muito mais pesada e demorada.

  2. Sites gigantes podem ter milhares ou milhões de páginas. Verifique o robots.txt e a política do site (termos de uso) para ver se é permitido “espelhar” o site. Caso contrário, seu IP pode ser bloqueado ou você pode até infringir termos legais.

  3. Velocidade de download:

    • Ferramentas como Wget e HTTrack costumam ser bem rápidas no que fazem.
    • Se o servidor do site tem limitações de banda ou detecta um volume muito grande de requisições, ele pode bloquear seu IP temporariamente.
    • Se for seu próprio site, ou você tiver autorização, e for estático, então não há grandes problemas.
  4. Uso offline: depois de baixar, você pode abrir localmente. Se o site for muito grande, espere muitas pastas e subpastas.


Resumindo

  • Para baixar tudo de forma rápida, use “wget --mirror” ou HTTrack.
  • Se o site tiver JavaScript avançado, você não capturará tudo perfeitamente, mas sim a versão estática das páginas.
  • Ferramentas como Scrapy ou Selenium são mais úteis para rastreamento customizado ou para sites dinâmicos, porém são mais lentas e dão mais trabalho para reproduzir a mesma “estrutura offline”.

Em boa parte dos casos, a maneira mais prática é realmente:

bash
wget --mirror --convert-links --adjust-extension --page-requisites --no-parent http://exemplo.com/

Isso vai produzir uma cópia navegável offline (HTML e arquivos relacionados) na sua máquina, de forma rápida e fácil.

Compartilhar esta Q&A