""" ======================================================================== Web Watcher — Motor de monitorización de URLs externas Buscador Multisearch — Entrega 6 (monitorización externa) ======================================================================== Visita periódicamente las URLs registradas en `enlaces_proceso`, detecta cambios y archivos nuevos (PDF/DOCX/DOC), y dispara la misma pipeline que el buscador de boletines: clasificador de fases + parser de bases + motor de notificaciones. Diseño: - **Educado con los servidores**: User-Agent identificable, rate limit de 1 req/10s por dominio, respeto a robots.txt, cache HTTP. - **Detección robusta de cambios**: normaliza el HTML antes de hashear (elimina scripts, timestamps, tokens) para evitar falsos cambios. - **Detección directa de archivos nuevos**: extrae todos los enlaces a PDF/DOCX y compara con la captura anterior. Si aparece uno nuevo, lo descarga. - **Heurística para procesar archivos descargados**: por nombre decide si pasarlo al parser de bases o al clasificador de sub-fases. Uso típico desde un job programado (cron, celery, etc.): from web_watcher import WebWatcher watcher = WebWatcher(conn, almacen_dir='/var/lib/buscador/archivos') watcher.monitorizar_pendientes(max_enlaces=50) Dependencias: pip install requests beautifulsoup4 psycopg2-binary """ import os import re import time import hashlib import logging import mimetypes from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Optional, Iterable from urllib.parse import urljoin, urlparse, unquote from urllib.robotparser import RobotFileParser import requests from bs4 import BeautifulSoup logger = logging.getLogger(__name__) # ======================================================================== # Configuración global # ======================================================================== USER_AGENT = "Buscador-Multisearch/1.0 (+https://buscador.example/about-bot)" TIMEOUT_HTTP = 30 RATE_LIMIT_SEGUNDOS_POR_DOMINIO = 10 MAX_BYTES_DESCARGA_HTML = 5 * 1024 * 1024 # 5 MB MAX_BYTES_DESCARGA_PDF = 50 * 1024 * 1024 # 50 MB ROBOTS_TXT_CACHE_HORAS = 24 SNAPSHOT_TEXTO_MAX_CHARS = 500_000 # 500 KB de texto guardado EXTENSIONES_DOCUMENTO = {'.pdf', '.docx', '.doc', '.xlsx', '.xls', '.odt'} # Patrones para detectar tipo de archivo por nombre PATRONES_TIPO_ARCHIVO = [ (re.compile(r'\b(bases|convocatoria|programa|temario)\b', re.IGNORECASE), 'bases'), (re.compile(r'\b(admitidos|excluidos|admisi[oó]n)\b', re.IGNORECASE), 'lista_admitidos'), (re.compile(r'\b(tribunal|composici[oó]n)\b', re.IGNORECASE), 'tribunal'), (re.compile(r'\b(calificaciones|notas|puntuaciones|resultados)\b', re.IGNORECASE), 'resultados'), (re.compile(r'\b(plantilla|respuestas\s+correctas)\b', re.IGNORECASE), 'plantilla'), (re.compile(r'\b(aprobados|nombramiento|propuesta)\b', re.IGNORECASE), 'aprobados'), (re.compile(r'\b(correcci[oó]n|errata|rectificaci[oó]n)\b', re.IGNORECASE), 'correccion_errores'), (re.compile(r'\bampliaci[oó]n\b', re.IGNORECASE), 'ampliacion_plazo'), ] # ======================================================================== # Estructuras de datos # ======================================================================== @dataclass class EnlacePendiente: id_enlace: str id_proceso: str url: str dominio: str tipo: str ultimo_hash: Optional[str] estado_proceso: str @dataclass class ResultadoCheck: """Resultado de comprobar una URL.""" id_enlace: str estado: str # 'sin_cambios', 'cambio_detectado', 'archivo_nuevo', # 'error_red', 'error_robots', 'error_4xx', 'error_5xx' hash_nuevo: Optional[str] = None texto_extraido: Optional[str] = None archivos_nuevos: list = None # list[ArchivoNuevo] archivos_eliminados: list = None # list[str] de URLs bytes_descargados: int = 0 codigo_http: Optional[int] = None content_type: Optional[str] = None error: Optional[str] = None @dataclass class ArchivoNuevo: url: str nombre: str tipo: str # 'pdf', 'docx', 'doc', 'xlsx', 'otro' # ======================================================================== # Motor principal # ======================================================================== class WebWatcher: """Monitoriza URLs externas y detecta cambios y archivos nuevos.""" def __init__(self, conn, almacen_dir: str = '/var/lib/buscador/archivos'): self.conn = conn self.almacen_dir = almacen_dir os.makedirs(self.almacen_dir, exist_ok=True) # Sesión HTTP reutilizable (mantiene cookies y conexiones) self.session = requests.Session() self.session.headers.update({'User-Agent': USER_AGENT}) # -------------------------------------------------------------- # Punto de entrada principal # -------------------------------------------------------------- def monitorizar_pendientes(self, max_enlaces: int = 50) -> dict: """Procesa los enlaces que tocan chequear. Devuelve dict con estadísticas: enlaces_procesados, cambios, archivos_nuevos, errores. """ stats = { 'enlaces_procesados': 0, 'sin_cambios': 0, 'cambio_detectado': 0, 'archivos_nuevos_total': 0, 'errores': 0, } enlaces = self._cargar_enlaces_pendientes(max_enlaces) logger.info(f"Procesando {len(enlaces)} enlaces pendientes") for enlace in enlaces: try: resultado = self.chequear_enlace(enlace) self._persistir_resultado(enlace, resultado) stats['enlaces_procesados'] += 1 if resultado.estado == 'sin_cambios': stats['sin_cambios'] += 1 elif resultado.estado in ('cambio_detectado', 'archivo_nuevo'): stats['cambio_detectado'] += 1 if resultado.archivos_nuevos: stats['archivos_nuevos_total'] += len(resultado.archivos_nuevos) else: stats['errores'] += 1 except Exception as e: logger.exception(f"Error procesando enlace {enlace.id_enlace}: {e}") stats['errores'] += 1 return stats # -------------------------------------------------------------- # Carga de enlaces pendientes desde BD # -------------------------------------------------------------- def _cargar_enlaces_pendientes(self, limit: int) -> list: with self.conn.cursor() as cur: cur.execute(""" SELECT id_enlace, id_proceso, url, dominio, tipo, ultimo_hash, estado_proceso FROM enlaces_para_chequear LIMIT %s """, (limit,)) return [ EnlacePendiente( id_enlace=str(r[0]), id_proceso=r[1], url=r[2], dominio=r[3], tipo=r[4], ultimo_hash=r[5], estado_proceso=r[6], ) for r in cur.fetchall() ] # -------------------------------------------------------------- # Comprobación de un enlace individual # -------------------------------------------------------------- def chequear_enlace(self, enlace: EnlacePendiente) -> ResultadoCheck: """Comprueba un enlace y devuelve qué ha cambiado.""" # 1. Verificar robots.txt if not self._robots_permite(enlace.url, enlace.dominio): return ResultadoCheck( id_enlace=enlace.id_enlace, estado='error_robots', error='robots.txt no permite acceso del bot', ) # 2. Rate limit por dominio self._respetar_rate_limit(enlace.dominio) # 3. Descargar la página try: r = self.session.get( enlace.url, timeout=TIMEOUT_HTTP, stream=True, # para limitar el tamaño allow_redirects=True, ) except requests.RequestException as e: return ResultadoCheck( id_enlace=enlace.id_enlace, estado='error_red', error=str(e)[:500], ) # 4. Verificar respuesta HTTP if 400 <= r.status_code < 500: return ResultadoCheck( id_enlace=enlace.id_enlace, estado='error_4xx', codigo_http=r.status_code, error=f"HTTP {r.status_code}", ) if r.status_code >= 500: return ResultadoCheck( id_enlace=enlace.id_enlace, estado='error_5xx', codigo_http=r.status_code, error=f"HTTP {r.status_code}", ) # 5. Leer el contenido (respetando límite de tamaño) try: contenido = self._leer_contenido_limitado(r, MAX_BYTES_DESCARGA_HTML) except Exception as e: return ResultadoCheck( id_enlace=enlace.id_enlace, estado='error_red', error=f"Error leyendo contenido: {e}", ) bytes_descargados = len(contenido) content_type = r.headers.get('Content-Type', '') # 6. Parsear HTML try: soup = BeautifulSoup(contenido, 'html.parser') except Exception as e: return ResultadoCheck( id_enlace=enlace.id_enlace, estado='error_red', codigo_http=r.status_code, error=f"Error parseando HTML: {e}", bytes_descargados=bytes_descargados, content_type=content_type, ) # 7. Calcular hash del contenido normalizado texto_normalizado = self._normalizar_html(soup) hash_nuevo = hashlib.sha256(texto_normalizado.encode('utf-8')).hexdigest() # 8. Extraer texto plano y enlaces a documentos texto_plano = soup.get_text(separator=' ', strip=True)[:SNAPSHOT_TEXTO_MAX_CHARS] archivos_detectados = self._extraer_enlaces_documentos(soup, enlace.url) # 9. Comparar con captura anterior if enlace.ultimo_hash == hash_nuevo: return ResultadoCheck( id_enlace=enlace.id_enlace, estado='sin_cambios', hash_nuevo=hash_nuevo, codigo_http=r.status_code, bytes_descargados=bytes_descargados, content_type=content_type, ) # 10. Hay cambio. Identificar archivos nuevos respecto a la # última captura. archivos_anteriores = self._cargar_archivos_anteriores(enlace.id_enlace) urls_anteriores = {a['url'] for a in archivos_anteriores} archivos_nuevos = [a for a in archivos_detectados if a.url not in urls_anteriores] urls_actuales = {a.url for a in archivos_detectados} archivos_eliminados = list(urls_anteriores - urls_actuales) estado = 'archivo_nuevo' if archivos_nuevos else 'cambio_detectado' return ResultadoCheck( id_enlace=enlace.id_enlace, estado=estado, hash_nuevo=hash_nuevo, texto_extraido=texto_plano, archivos_nuevos=archivos_nuevos, archivos_eliminados=archivos_eliminados, bytes_descargados=bytes_descargados, codigo_http=r.status_code, content_type=content_type, ) # -------------------------------------------------------------- # Persistencia del resultado # -------------------------------------------------------------- def _persistir_resultado(self, enlace: EnlacePendiente, resultado: ResultadoCheck) -> None: with self.conn.cursor() as cur: # 1. Actualizar enlace cur.execute(""" UPDATE enlaces_proceso SET ultimo_check = NOW(), proximo_check = calcular_proximo_check(id_enlace), ultimo_hash = COALESCE(%s, ultimo_hash), ultimo_cambio = CASE WHEN %s IN ('cambio_detectado', 'archivo_nuevo') THEN NOW() ELSE ultimo_cambio END, estado_ultimo_check = %s, ultimo_error = %s, num_archivos_detectados = COALESCE(%s, num_archivos_detectados) WHERE id_enlace = %s """, ( resultado.hash_nuevo, resultado.estado, resultado.estado, resultado.error, len(resultado.archivos_nuevos) if resultado.archivos_nuevos else None, enlace.id_enlace, )) # 2. Si hay cambio, crear snapshot if resultado.estado in ('cambio_detectado', 'archivo_nuevo'): archivos_json = [ {'url': a.url, 'nombre': a.nombre, 'tipo': a.tipo} for a in (resultado.archivos_nuevos or []) ] import json cur.execute(""" INSERT INTO snapshots_enlace (id_enlace, hash_contenido, codigo_http, content_type, bytes_descargados, texto_extraido, archivos_detectados, archivos_nuevos, archivos_eliminados) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( enlace.id_enlace, resultado.hash_nuevo, resultado.codigo_http, resultado.content_type, resultado.bytes_descargados, resultado.texto_extraido, json.dumps(archivos_json), [a.nombre for a in (resultado.archivos_nuevos or [])], resultado.archivos_eliminados or [], )) # 3. Si hay archivos nuevos, registrarlos para procesar for archivo in (resultado.archivos_nuevos or []): tipo_detectado = self._heuristica_tipo_archivo(archivo.nombre) cur.execute(""" INSERT INTO archivos_detectados (id_enlace, id_proceso, url_archivo, nombre_archivo, tipo) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (id_enlace, url_archivo) DO NOTHING """, ( enlace.id_enlace, enlace.id_proceso, archivo.url, archivo.nombre, archivo.tipo, )) # 4. Si hay cambio, crear evento_notificacion if resultado.estado in ('cambio_detectado', 'archivo_nuevo'): urgencia = 'alta' if resultado.estado == 'archivo_nuevo' else 'media' tipo_evento = ('archivo_externo_nuevo' if resultado.estado == 'archivo_nuevo' else 'enlace_externo_cambiado') cur.execute(""" INSERT INTO eventos_notificacion (id_proceso, tipo, urgencia, datos) VALUES (%s, %s, %s, %s::jsonb) """, ( enlace.id_proceso, tipo_evento, urgencia, f'{{"id_enlace": "{enlace.id_enlace}", "url": "{enlace.url}", "num_archivos": {len(resultado.archivos_nuevos or [])}}}', )) self.conn.commit() # -------------------------------------------------------------- # Descarga y procesamiento de archivos detectados # -------------------------------------------------------------- def procesar_archivos_pendientes(self, max_archivos: int = 20) -> dict: """Descarga y procesa archivos que aún no se han bajado al almacén.""" stats = {'descargados': 0, 'procesados': 0, 'errores': 0} with self.conn.cursor() as cur: cur.execute(""" SELECT id_archivo, id_enlace, id_proceso, url_archivo, nombre_archivo, tipo FROM archivos_detectados WHERE descargado = FALSE ORDER BY fecha_primera_deteccion LIMIT %s """, (max_archivos,)) archivos = cur.fetchall() for row in archivos: id_archivo, id_enlace, id_proceso, url_archivo, nombre, tipo = row try: self._descargar_y_procesar( str(id_archivo), id_enlace, id_proceso, url_archivo, nombre, tipo ) stats['descargados'] += 1 except Exception as e: logger.exception(f"Error procesando archivo {id_archivo}: {e}") stats['errores'] += 1 return stats def _descargar_y_procesar(self, id_archivo: str, id_enlace: str, id_proceso: str, url_archivo: str, nombre: str, tipo: str) -> None: """Descarga el archivo, calcula su hash y decide qué procesador usar.""" dominio = urlparse(url_archivo).netloc self._respetar_rate_limit(dominio) try: r = self.session.get(url_archivo, timeout=TIMEOUT_HTTP, stream=True) r.raise_for_status() contenido = self._leer_contenido_limitado(r, MAX_BYTES_DESCARGA_PDF) except Exception as e: with self.conn.cursor() as cur: cur.execute(""" UPDATE archivos_detectados SET requiere_revision = TRUE, motivo_revision = %s WHERE id_archivo = %s """, (f"Error descargando: {str(e)[:300]}", id_archivo)) self.conn.commit() return # Calcular hash SHA-256 y ruta de almacenamiento hash_archivo = hashlib.sha256(contenido).hexdigest() ahora = datetime.now() nombre_limpio = re.sub(r'[^\w.\-]', '_', nombre)[:200] ruta_relativa = (f"{id_proceso}/{ahora.year}/{ahora.month:02d}/" f"{hash_archivo[:16]}_{nombre_limpio}") ruta_completa = os.path.join(self.almacen_dir, ruta_relativa) os.makedirs(os.path.dirname(ruta_completa), exist_ok=True) with open(ruta_completa, 'wb') as f: f.write(contenido) # Heurística para decidir procesador tipo_archivo = self._heuristica_tipo_archivo(nombre) procesador = None if tipo_archivo == 'bases': procesador = 'parser_bases' elif tipo_archivo in ('lista_admitidos', 'tribunal', 'resultados', 'plantilla', 'aprobados', 'correccion_errores', 'ampliacion_plazo'): procesador = 'clasificador_subfases' # Marcar como descargado en BD with self.conn.cursor() as cur: cur.execute(""" UPDATE archivos_detectados SET descargado = TRUE, descargado_a = %s, hash_archivo = %s, bytes = %s, fecha_descarga = NOW(), requiere_revision = CASE WHEN %s IS NULL THEN TRUE ELSE FALSE END, motivo_revision = CASE WHEN %s IS NULL THEN 'No se reconoce el tipo de archivo, revisar manualmente' ELSE NULL END WHERE id_archivo = %s """, (ruta_relativa, hash_archivo, len(contenido), procesador, procesador, id_archivo)) self.conn.commit() # Si hay procesador, dispararlo if procesador == 'parser_bases': self._procesar_con_parser_bases(id_archivo, ruta_completa, id_proceso) elif procesador == 'clasificador_subfases': self._procesar_con_clasificador(id_archivo, ruta_completa, id_proceso, tipo_archivo) def _procesar_con_parser_bases(self, id_archivo: str, ruta_pdf: str, id_proceso: str) -> None: """Llama al parser de bases y persiste el resultado.""" # Implementación: importar ParserBases del archivo 04 y ejecutar # parser.parsear(ruta_pdf). Aquí se da el esqueleto. try: from parser_bases import ParserBases parser = ParserBases() resultado = parser.parsear(ruta_pdf) # Persistir resultado en archivos_detectados.resultado_proceso # y eventualmente crear el temario en BD (tabla temarios). with self.conn.cursor() as cur: cur.execute(""" UPDATE archivos_detectados SET procesado = TRUE, procesador_usado = 'parser_bases', resultado_proceso = %s::jsonb, fase_detectada = 'bases', fecha_procesamiento = NOW() WHERE id_archivo = %s """, ( '{"parser_ok": ' + str(resultado.parser_ok).lower() + ', "num_temas": ' + str(resultado.num_temas_total) + '}', id_archivo )) # Crear evento de notificación para la fase de bases cur.execute(""" INSERT INTO eventos_notificacion (id_proceso, tipo, fase, numero_fase, urgencia, datos) VALUES (%s, 'cambio_fase', 'bases', 2, 'critica', '{}'::jsonb) """, (id_proceso,)) self.conn.commit() except Exception as e: logger.exception(f"Error parser_bases en archivo {id_archivo}: {e}") def _procesar_con_clasificador(self, id_archivo: str, ruta_pdf: str, id_proceso: str, tipo_archivo: str) -> None: """Marca el archivo con la sub-fase detectada por su nombre.""" # Mapeo de tipo_archivo a sub-fase del modelo de 10 fases mapeo_subfase = { 'lista_admitidos': 'lista_provisional_admitidos', 'tribunal': 'composicion_tribunal', # modificador 'resultados': 'resultados', 'plantilla': 'resultados', 'aprobados': 'relacion_aprobados', 'correccion_errores': None, # modificador 'ampliacion_plazo': None, # modificador } fase = mapeo_subfase.get(tipo_archivo) modificador = (tipo_archivo if tipo_archivo in ('correccion_errores', 'ampliacion_plazo', 'tribunal') else None) with self.conn.cursor() as cur: cur.execute(""" UPDATE archivos_detectados SET procesado = TRUE, procesador_usado = 'clasificador_subfases', fase_detectada = %s, modificador_detectado = %s, fecha_procesamiento = NOW() WHERE id_archivo = %s """, (fase, modificador, id_archivo)) cur.execute(""" INSERT INTO eventos_notificacion (id_proceso, tipo, fase, modificador, urgencia, datos) VALUES (%s, 'cambio_fase', %s, %s, 'alta', '{}'::jsonb) """, (id_proceso, fase, modificador)) self.conn.commit() # -------------------------------------------------------------- # Utilidades: rate limit, robots.txt, normalización # -------------------------------------------------------------- def _respetar_rate_limit(self, dominio: str) -> None: """Espera lo necesario para no superar 1 req/10s al mismo dominio.""" with self.conn.cursor() as cur: cur.execute(""" SELECT ultima_visita FROM dominios_visitados WHERE dominio = %s """, (dominio,)) row = cur.fetchone() if row: ultima = row[0] ahora = datetime.now(timezone.utc) transcurrido = (ahora - ultima).total_seconds() if transcurrido < RATE_LIMIT_SEGUNDOS_POR_DOMINIO: time.sleep(RATE_LIMIT_SEGUNDOS_POR_DOMINIO - transcurrido) cur.execute(""" INSERT INTO dominios_visitados (dominio, ultima_visita, requests_24h) VALUES (%s, NOW(), 1) ON CONFLICT (dominio) DO UPDATE SET ultima_visita = NOW(), requests_24h = CASE WHEN dominios_visitados.fecha_reset_contador < NOW() - INTERVAL '24 hours' THEN 1 ELSE dominios_visitados.requests_24h + 1 END, fecha_reset_contador = CASE WHEN dominios_visitados.fecha_reset_contador < NOW() - INTERVAL '24 hours' THEN NOW() ELSE dominios_visitados.fecha_reset_contador END """, (dominio,)) self.conn.commit() def _robots_permite(self, url: str, dominio: str) -> bool: """Comprueba robots.txt del dominio (con caché de 24h).""" with self.conn.cursor() as cur: cur.execute(""" SELECT robots_txt, robots_txt_fetched, robots_permite_bot FROM dominios_visitados WHERE dominio = %s """, (dominio,)) row = cur.fetchone() ahora = datetime.now(timezone.utc) necesita_refresh = ( row is None or row[1] is None or (ahora - row[1]) > timedelta(hours=ROBOTS_TXT_CACHE_HORAS) ) if not necesita_refresh: return row[2] # Refrescar robots.txt try: rp = RobotFileParser() rp.set_url(f"https://{dominio}/robots.txt") rp.read() permite = rp.can_fetch(USER_AGENT, url) except Exception: # Si falla la descarga, permitir por defecto (pero loguear) logger.warning(f"No se pudo descargar robots.txt de {dominio}, permitiendo") permite = True cur.execute(""" INSERT INTO dominios_visitados (dominio, robots_txt_fetched, robots_permite_bot) VALUES (%s, NOW(), %s) ON CONFLICT (dominio) DO UPDATE SET robots_txt_fetched = NOW(), robots_permite_bot = EXCLUDED.robots_permite_bot """, (dominio, permite)) self.conn.commit() return permite def _leer_contenido_limitado(self, response, max_bytes: int) -> bytes: """Lee el contenido de una respuesta HTTP respetando un límite.""" contenido = b'' for chunk in response.iter_content(chunk_size=8192): if not chunk: break contenido += chunk if len(contenido) > max_bytes: raise ValueError(f"Contenido excede {max_bytes} bytes") return contenido def _normalizar_html(self, soup: BeautifulSoup) -> str: """Normaliza HTML antes de hashear: elimina elementos volátiles.""" # Quitar scripts, estilos, meta for tag in soup(['script', 'style', 'meta', 'noscript']): tag.decompose() # Quitar atributos volátiles (tokens, ids con timestamps) for tag in soup.find_all(True): atributos_a_quitar = [] for attr in tag.attrs: if attr in ('nonce', 'data-csrf', 'data-timestamp'): atributos_a_quitar.append(attr) # Si el valor del atributo contiene un timestamp claro, también val = tag.attrs.get(attr) if isinstance(val, str) and re.search(r'\d{10,13}', val): atributos_a_quitar.append(attr) for attr in atributos_a_quitar: del tag.attrs[attr] texto = soup.get_text(separator=' ', strip=True) # Eliminar patrones de fecha/hora dinámicos texto = re.sub( r'(?:\d{1,2}/\d{1,2}/\d{2,4}|\d{1,2}:\d{2}(?::\d{2})?|hace\s+\d+\s+\w+)', '', texto ) # Normalizar espacios texto = re.sub(r'\s+', ' ', texto).strip() return texto def _extraer_enlaces_documentos(self, soup: BeautifulSoup, base_url: str) -> list: """Extrae todos los enlaces a PDF/DOCX/DOC de la página.""" archivos = [] vistos = set() for a in soup.find_all('a', href=True): href = a['href'] url_absoluta = urljoin(base_url, href) if url_absoluta in vistos: continue # Detectar extensión path = urlparse(url_absoluta).path.lower() ext = os.path.splitext(path)[1] if ext not in EXTENSIONES_DOCUMENTO: continue vistos.add(url_absoluta) nombre = unquote(os.path.basename(path)) or a.get_text(strip=True)[:100] tipo = ext.lstrip('.') archivos.append(ArchivoNuevo( url=url_absoluta, nombre=nombre, tipo=tipo, )) return archivos def _heuristica_tipo_archivo(self, nombre: str) -> Optional[str]: """Decide el tipo lógico de un archivo por su nombre.""" for regex, tipo in PATRONES_TIPO_ARCHIVO: if regex.search(nombre): return tipo return None def _cargar_archivos_anteriores(self, id_enlace: str) -> list: """URLs de archivos detectados en visitas anteriores del enlace.""" with self.conn.cursor() as cur: cur.execute(""" SELECT url_archivo, nombre_archivo, tipo FROM archivos_detectados WHERE id_enlace = %s """, (id_enlace,)) return [ {'url': r[0], 'nombre': r[1], 'tipo': r[2]} for r in cur.fetchall() ] # ======================================================================== # CLI # ======================================================================== if __name__ == '__main__': import sys if len(sys.argv) < 2: print("Web Watcher — Buscador Multisearch") print("") print("Uso:") print(" python 16_web_watcher.py --monitorizar [max_enlaces]") print(" python 16_web_watcher.py --procesar-archivos [max_archivos]") print(" python 16_web_watcher.py --test-url ") sys.exit(0) if sys.argv[1] == '--test-url': # Prueba aislada sin BD url = sys.argv[2] print(f"Probando URL: {url}") session = requests.Session() session.headers.update({'User-Agent': USER_AGENT}) try: r = session.get(url, timeout=TIMEOUT_HTTP) soup = BeautifulSoup(r.content, 'html.parser') print(f"HTTP {r.status_code}, {len(r.content)} bytes") watcher = WebWatcher.__new__(WebWatcher) archivos = watcher._extraer_enlaces_documentos(soup, url) print(f"\nArchivos detectados: {len(archivos)}") for a in archivos[:20]: print(f" [{a.tipo}] {a.nombre}") print(f" {a.url}") except Exception as e: print(f"Error: {e}") else: print("Para ejecutar con BD, importa WebWatcher en tu pipeline.")