""" ======================================================================== Motor de reglas de notificación Buscador Multisearch — Entrega 5 (sistema de notificaciones) ======================================================================== Materializa la lógica de evaluación de las reglas configuradas en la tabla `reglas_notificacion` (07_db_alertas_y_reglas.sql). Cada vez que el buscador detecta un evento (nuevo hit clasificado por fase), este motor: 1. Determina qué usuarios deben recibir notificación (suscriptores directos al proceso + suscriptores a la categoría temática + reglas personalizadas que matcheen). 2. Para cada usuario, decide por qué canales enviarle (diario, inmediato, dashboard, ical) según sus preferencias. 3. Aplica filtros de protección: silencios manuales, antispam por volumen, reglas suspendidas por abuso. 4. Persiste en `notificaciones_enviadas` las que pasan el filtro, listas para que el sender de emails las procese. Punto de entrada principal: motor = MotorReglas(conn) motor.procesar_evento(id_evento_notif='abc-123-...') Diseño: - Reglas en JSONB con un mini-DSL declarativo, no código embebido (evita riesgos de inyección y permite editar reglas desde UI). - Operadores: equals, in, contains, not_equals, gt, lt, between, regex. - Combinadores lógicos: all (AND), any (OR), not. - Cada operador es una función Python testeable de forma aislada. - El motor es **idempotente**: ejecutar 2 veces el mismo evento no duplica notificaciones (constraint UNIQUE en BD). Dependencias: pip install psycopg2-binary pyyaml python-dateutil """ import re import json import logging from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from typing import Any, Optional, Union from uuid import UUID logger = logging.getLogger(__name__) # ======================================================================== # Estructuras de datos # ======================================================================== @dataclass class EventoNotificacion: """Replica de la fila de eventos_notificacion necesaria para evaluar.""" id_evento_notif: str id_proceso: str id_evento_proceso: Optional[str] tipo: str fase: Optional[str] numero_fase: Optional[int] modificador: Optional[str] urgencia: str datos: dict = field(default_factory=dict) @dataclass class ContextoProceso: """Datos del proceso al que pertenece el evento, para evaluar reglas.""" id_proceso: str administracion_codigo: str administracion_nombre: str cuerpo_codigo: str cuerpo_nombre: str cuerpo_codigo_oficial: Optional[str] grupo: Optional[str] categoria_tematica: str anio_oep: int turno: str num_plazas_total: Optional[int] estado: str @dataclass class Usuario: """Datos mínimos de un usuario para evaluar destinatarios.""" id_usuario: str email: str rol: str organizacion: Optional[str] estado: str = 'activo' @dataclass class Notificacion: """Una notificación lista para enviar (antes de persistir en BD).""" id_usuario: str id_evento_notif: str canal: str # 'email_diario', 'email_inmediato', 'dashboard_alerta', 'ical' id_regla_origen: Optional[str] = None razon: str = '' # explicación humana de por qué se notifica # ======================================================================== # Evaluador de condiciones (mini-DSL declarativo) # ======================================================================== class EvaluadorCondiciones: """Evalúa una condición JSONB contra un contexto de evento+proceso. El DSL es declarativo. Cada nodo es un dict con una clave que indica el operador. Ejemplos: {"fase": "bases"} equivale a: contexto.fase == "bases" {"administracion": {"in": ["aeat", "agencia_tributaria"]}} equivale a: contexto.administracion in [...] {"all": [ {"fase": "bases"}, {"categoria_tematica": "hacienda"} ]} equivale a: fase == bases AND categoria == hacienda {"any": [ {"administracion": "ayto_madrid"}, {"administracion": "ayto_barcelona"} ]} {"not": {"estado": "archivado"}} {"porcentaje_cambio_temario": {"gt": 20}} accede a contexto.datos['porcentaje_cambio_temario'] """ # Operadores soportados OPERADORES = { 'equals', 'eq', 'not_equals', 'ne', 'in', 'not_in', 'nin', 'contains', 'starts_with', 'ends_with', 'gt', 'gte', 'lt', 'lte', 'between', 'regex', } # Combinadores lógicos COMBINADORES = {'all', 'any', 'not'} def evaluar(self, condicion: dict, contexto: dict) -> bool: """Evalúa una condición contra un contexto plano (dict).""" if not isinstance(condicion, dict): raise ValueError(f"Condición debe ser dict, recibido: {type(condicion)}") # Casos especiales: combinadores if 'all' in condicion: return all(self.evaluar(c, contexto) for c in condicion['all']) if 'any' in condicion: return any(self.evaluar(c, contexto) for c in condicion['any']) if 'not' in condicion: return not self.evaluar(condicion['not'], contexto) # Caso normal: cada clave del dict es un campo del contexto for campo, esperado in condicion.items(): if campo in self.COMBINADORES: continue # ya tratado arriba valor = self._obtener_valor(contexto, campo) if not self._coincide(valor, esperado): return False return True def _obtener_valor(self, contexto: dict, campo: str) -> Any: """Obtiene un campo del contexto, soporta notación punto: 'datos.porcentaje_cambio_temario' → contexto['datos']['porcentaje_cambio_temario'] """ partes = campo.split('.') valor = contexto for parte in partes: if isinstance(valor, dict) and parte in valor: valor = valor[parte] else: return None return valor def _coincide(self, valor: Any, esperado: Any) -> bool: """Compara el valor del contexto con el esperado de la condición.""" # Caso simple: esperado es escalar → equals directo if not isinstance(esperado, dict): return valor == esperado # Esperado es un dict con un operador # {"in": ["a", "b"]} o {"gt": 10} o {"regex": ".*madrid.*"} for operador, arg in esperado.items(): if operador not in self.OPERADORES: raise ValueError(f"Operador desconocido: {operador}") if not self._aplicar_operador(operador, valor, arg): return False return True def _aplicar_operador(self, op: str, valor: Any, arg: Any) -> bool: """Aplica un operador concreto entre el valor del contexto y el arg.""" if op in ('equals', 'eq'): return valor == arg if op in ('not_equals', 'ne'): return valor != arg if op == 'in': return valor in arg if arg else False if op in ('not_in', 'nin'): return valor not in arg if arg else True if op == 'contains': if valor is None: return False return arg in valor if op == 'starts_with': return isinstance(valor, str) and valor.startswith(arg) if op == 'ends_with': return isinstance(valor, str) and valor.endswith(arg) if op == 'gt': return valor is not None and valor > arg if op == 'gte': return valor is not None and valor >= arg if op == 'lt': return valor is not None and valor < arg if op == 'lte': return valor is not None and valor <= arg if op == 'between': return valor is not None and arg[0] <= valor <= arg[1] if op == 'regex': return isinstance(valor, str) and bool(re.search(arg, valor)) return False # ======================================================================== # Motor principal # ======================================================================== class MotorReglas: """Procesa un evento de notificación y decide qué notificaciones crear según reglas configuradas y preferencias de usuarios.""" # Antispam: máximo de notificaciones de un mismo tipo a un usuario # antes de empezar a silenciar LIMITE_ANTISPAM_HORA = 10 LIMITE_ANTISPAM_DIA = 30 def __init__(self, conn): """conn: conexión psycopg2 a PostgreSQL.""" self.conn = conn self.evaluador = EvaluadorCondiciones() # -------------------------------------------------------------- # Punto de entrada principal # -------------------------------------------------------------- def procesar_evento(self, id_evento_notif: str) -> list: """Procesa un evento y crea las notificaciones que correspondan. Returns: Lista de Notificacion creadas (ya persistidas en BD). """ # 1. Cargar el evento y su contexto evento = self._cargar_evento(id_evento_notif) if not evento: logger.warning(f"Evento {id_evento_notif} no encontrado") return [] proceso = self._cargar_proceso(evento.id_proceso) if not proceso: logger.warning(f"Proceso {evento.id_proceso} no encontrado") return [] # 2. Determinar destinatarios potenciales destinatarios = self._determinar_destinatarios(evento, proceso) # 3. Para cada destinatario, decidir qué canales aplican notificaciones = [] for usuario, canal_sugerido, id_regla_origen, razon in destinatarios: # 3a. Verificar silencios manuales if self._esta_silenciado(usuario, evento, proceso, canal_sugerido): logger.debug(f"Usuario {usuario.id_usuario} ha silenciado este tipo") continue # 3b. Verificar antispam (volumen reciente) if not self._pasa_antispam(usuario, evento, canal_sugerido): logger.info(f"Antispam: usuario {usuario.id_usuario} supera límite " f"para canal {canal_sugerido}") continue # 3c. Verificar duplicados (idempotencia) if self._ya_notificado(usuario, evento, canal_sugerido): logger.debug(f"Ya notificado: {usuario.id_usuario}/{evento.id_evento_notif}/{canal_sugerido}") continue notif = Notificacion( id_usuario=usuario.id_usuario, id_evento_notif=evento.id_evento_notif, canal=canal_sugerido, id_regla_origen=id_regla_origen, razon=razon, ) notificaciones.append(notif) # 4. Persistir todas las notificaciones de golpe self._persistir_notificaciones(notificaciones) # 5. Marcar el evento como procesado self._marcar_evento_procesado(evento.id_evento_notif) return notificaciones # -------------------------------------------------------------- # Carga de datos # -------------------------------------------------------------- def _cargar_evento(self, id_evento_notif: str) -> Optional[EventoNotificacion]: with self.conn.cursor() as cur: cur.execute(""" SELECT id_evento_notif, id_proceso, id_evento_proceso, tipo, fase, numero_fase, modificador, urgencia, datos FROM eventos_notificacion WHERE id_evento_notif = %s """, (id_evento_notif,)) row = cur.fetchone() if not row: return None return EventoNotificacion( id_evento_notif=str(row[0]), id_proceso=row[1], id_evento_proceso=str(row[2]) if row[2] else None, tipo=row[3], fase=row[4], numero_fase=row[5], modificador=row[6], urgencia=row[7], datos=row[8] or {}, ) def _cargar_proceso(self, id_proceso: str) -> Optional[ContextoProceso]: with self.conn.cursor() as cur: cur.execute(""" SELECT id_proceso, administracion_codigo, administracion_nombre, cuerpo_codigo, cuerpo_nombre, cuerpo_codigo_oficial, grupo, categoria_tematica, anio_oep, turno, num_plazas_total, estado FROM procesos WHERE id_proceso = %s """, (id_proceso,)) row = cur.fetchone() if not row: return None return ContextoProceso( id_proceso=row[0], administracion_codigo=row[1], administracion_nombre=row[2], cuerpo_codigo=row[3], cuerpo_nombre=row[4], cuerpo_codigo_oficial=row[5], grupo=row[6], categoria_tematica=row[7], anio_oep=row[8], turno=row[9], num_plazas_total=row[10], estado=row[11], ) def _cargar_usuario(self, id_usuario: str) -> Optional[Usuario]: with self.conn.cursor() as cur: cur.execute(""" SELECT id_usuario, email, rol, organizacion, estado FROM usuarios WHERE id_usuario = %s """, (id_usuario,)) row = cur.fetchone() if not row: return None return Usuario( id_usuario=str(row[0]), email=row[1], rol=row[2], organizacion=row[3], estado=row[4], ) # -------------------------------------------------------------- # Determinar destinatarios # -------------------------------------------------------------- def _determinar_destinatarios( self, evento: EventoNotificacion, proceso: ContextoProceso ) -> list: """Devuelve una lista de tuplas (usuario, canal, id_regla, razon). Combina tres fuentes: a) Suscriptores directos al proceso (con sus preferencias por proceso). b) Suscriptores a la categoría temática (con sus preferencias globales). c) Reglas avanzadas que matcheen (personales, de organización, sistema). """ destinatarios = [] ya_anadidos = set() # (id_usuario, canal) para evitar duplicados # a) Suscriptores directos al proceso for usuario, canal in self._suscriptores_directos(evento, proceso): key = (usuario.id_usuario, canal) if key not in ya_anadidos: ya_anadidos.add(key) destinatarios.append(( usuario, canal, None, f"Sigues el proceso «{proceso.cuerpo_nombre} — {proceso.administracion_nombre}»" )) # b) Suscriptores a la categoría temática for usuario, canal in self._suscriptores_categoria(evento, proceso): key = (usuario.id_usuario, canal) if key not in ya_anadidos: ya_anadidos.add(key) destinatarios.append(( usuario, canal, None, f"Estás suscrito a la categoría «{proceso.categoria_tematica}»" )) # c) Reglas avanzadas for usuario, canal, id_regla, razon in self._aplicar_reglas(evento, proceso): key = (usuario.id_usuario, canal) if key not in ya_anadidos: ya_anadidos.add(key) destinatarios.append((usuario, canal, id_regla, razon)) return destinatarios def _suscriptores_directos(self, evento, proceso) -> list: """Usuarios que siguen este proceso concretamente.""" resultado = [] with self.conn.cursor() as cur: cur.execute(""" SELECT u.id_usuario, u.email, u.rol, u.organizacion, u.estado, sp.email_diario, sp.email_inmediato, sp.incluir_en_ical, pn.email_inmediato_activo, pn.inmediato_fase_oep, pn.inmediato_fase_bases, pn.inmediato_fase_presentacion, pn.inmediato_fase_lista_prov, pn.inmediato_fase_lista_def, pn.inmediato_fase_fecha_examen, pn.inmediato_fase_desarrollo, pn.inmediato_fase_resultados, pn.inmediato_fase_aprobados, pn.inmediato_fase_nombramiento, pn.inmediato_correccion_errores, pn.inmediato_ampliacion_plazo FROM suscripciones_proceso sp JOIN usuarios u ON u.id_usuario = sp.id_usuario LEFT JOIN preferencias_notificacion pn ON pn.id_usuario = u.id_usuario WHERE sp.id_proceso = %s AND sp.fecha_baja IS NULL AND u.estado = 'activo' """, (evento.id_proceso,)) for row in cur.fetchall(): usuario = Usuario( id_usuario=str(row[0]), email=row[1], rol=row[2], organizacion=row[3], estado=row[4] ) email_diario_proc = row[5] email_inmediato_proc = row[6] # incluir_en_ical = row[7] # no se usa aquí, sí en ICS email_inmediato_activo = row[8] if row[8] is not None else True # Email diario: siempre que el usuario lo tenga activo # (no diferenciamos por fase, el diario lleva todo) if email_diario_proc: resultado.append((usuario, 'email_diario')) # Email inmediato: solo si el usuario quiere inmediatos # Y la fase concreta está marcada en sus preferencias if email_inmediato_proc and email_inmediato_activo: if self._fase_quiere_inmediato(evento, row, idx_start=9): resultado.append((usuario, 'email_inmediato')) # Dashboard: siempre se notifica (es pull, no molesta) resultado.append((usuario, 'dashboard_alerta')) return resultado def _suscriptores_categoria(self, evento, proceso) -> list: """Usuarios suscritos a la categoría temática del proceso.""" resultado = [] with self.conn.cursor() as cur: cur.execute(""" SELECT u.id_usuario, u.email, u.rol, u.organizacion, u.estado, pn.email_diario_activo, pn.email_inmediato_activo, pn.inmediato_fase_oep, pn.inmediato_fase_bases, pn.inmediato_fase_presentacion, pn.inmediato_fase_lista_prov, pn.inmediato_fase_lista_def, pn.inmediato_fase_fecha_examen, pn.inmediato_fase_desarrollo, pn.inmediato_fase_resultados, pn.inmediato_fase_aprobados, pn.inmediato_fase_nombramiento, pn.inmediato_correccion_errores, pn.inmediato_ampliacion_plazo FROM suscripciones s JOIN usuarios u ON u.id_usuario = s.id_usuario LEFT JOIN preferencias_notificacion pn ON pn.id_usuario = u.id_usuario WHERE s.categoria = %s AND s.activa = TRUE AND u.estado = 'activo' -- Excluir los que ya tienen suscripción directa AND NOT EXISTS ( SELECT 1 FROM suscripciones_proceso sp WHERE sp.id_usuario = u.id_usuario AND sp.id_proceso = %s AND sp.fecha_baja IS NULL ) """, (proceso.categoria_tematica, evento.id_proceso)) for row in cur.fetchall(): usuario = Usuario( id_usuario=str(row[0]), email=row[1], rol=row[2], organizacion=row[3], estado=row[4] ) email_diario_activo = row[5] if row[5] is not None else True email_inmediato_activo = row[6] if row[6] is not None else True if email_diario_activo: resultado.append((usuario, 'email_diario')) if email_inmediato_activo: if self._fase_quiere_inmediato(evento, row, idx_start=7): resultado.append((usuario, 'email_inmediato')) resultado.append((usuario, 'dashboard_alerta')) return resultado def _fase_quiere_inmediato(self, evento, row, idx_start: int) -> bool: """Comprueba si las preferencias del usuario incluyen esta fase en email inmediato. row[idx_start:idx_start+11] son los 11 flags de fase + 2 modificadores en orden: oep, bases, presentacion, lista_prov, lista_def, fecha_examen, desarrollo, resultados, aprobados, nombramiento, correccion_errores, ampliacion_plazo """ mapeo = { 'oep': idx_start, 'bases': idx_start + 1, 'presentacion_solicitudes': idx_start + 2, 'lista_provisional_admitidos': idx_start + 3, 'lista_definitiva_admitidos': idx_start + 4, 'fecha_lugar_examen': idx_start + 5, 'desarrollo_ejercicios': idx_start + 6, 'resultados': idx_start + 7, 'relacion_aprobados': idx_start + 8, 'nombramiento_posesion': idx_start + 9, } mapeo_mod = { 'correccion_errores': idx_start + 10, 'ampliacion_plazo': idx_start + 11, } # Modificador tiene prioridad if evento.modificador and evento.modificador in mapeo_mod: idx = mapeo_mod[evento.modificador] return bool(row[idx]) if idx < len(row) else False if evento.fase and evento.fase in mapeo: idx = mapeo[evento.fase] return bool(row[idx]) if idx < len(row) else False return False def _aplicar_reglas(self, evento, proceso) -> list: """Evalúa las reglas activas (personales, organizacionales, sistema) contra el contexto y devuelve los matches.""" resultado = [] # Contexto plano para el evaluador contexto = { # Del proceso 'administracion': proceso.administracion_codigo, 'administracion_nombre': proceso.administracion_nombre, 'cuerpo': proceso.cuerpo_codigo, 'cuerpo_nombre': proceso.cuerpo_nombre, 'cuerpo_codigo_oficial': proceso.cuerpo_codigo_oficial, 'grupo': proceso.grupo, 'categoria_tematica': proceso.categoria_tematica, 'categoria': proceso.categoria_tematica, # alias 'anio_oep': proceso.anio_oep, 'anio': proceso.anio_oep, # alias 'turno': proceso.turno, 'num_plazas_total': proceso.num_plazas_total, 'estado_proceso': proceso.estado, # Del evento 'tipo_evento': evento.tipo, 'fase': evento.fase, 'numero_fase': evento.numero_fase, 'modificador': evento.modificador, 'urgencia': evento.urgencia, 'datos': evento.datos, } with self.conn.cursor() as cur: cur.execute(""" SELECT id_regla, nombre, condicion, accion, id_usuario, organizacion, es_regla_sistema FROM reglas_notificacion WHERE activa = TRUE AND (suspendida_hasta IS NULL OR suspendida_hasta < NOW()) AND disparos_ultimas_24h < max_disparos_por_dia ORDER BY prioridad ASC """) reglas = cur.fetchall() for r in reglas: id_regla = str(r[0]) nombre = r[1] condicion = r[2] accion = r[3] id_usuario_regla = r[4] organizacion_regla = r[5] es_sistema = r[6] try: if not self.evaluador.evaluar(condicion, contexto): continue except Exception as e: logger.error(f"Error evaluando regla {nombre} ({id_regla}): {e}") continue # La regla dispara: incrementar contador self._incrementar_contador_regla(id_regla) # Determinar destinatarios según el ámbito de la regla usuarios_destino = self._destinatarios_de_regla( id_usuario_regla, organizacion_regla, es_sistema, accion ) # Determinar canales según la acción canales = accion.get('canales', ['email_inmediato']) if isinstance(canales, str): canales = [canales] razon = f"Regla: {nombre}" for usuario in usuarios_destino: for canal in canales: resultado.append((usuario, canal, id_regla, razon)) return resultado def _destinatarios_de_regla(self, id_usuario_regla, organizacion_regla, es_sistema, accion) -> list: """Determina a qué usuarios aplica una regla concreta.""" if id_usuario_regla: # Regla personal: solo aplica a ese usuario u = self._cargar_usuario(str(id_usuario_regla)) return [u] if u else [] # Para reglas de organización o sistema, los destinatarios # vienen en la acción (lista de emails) o se infieren del rol destinatarios_accion = accion.get('destinatarios', []) if not destinatarios_accion: return [] usuarios = [] with self.conn.cursor() as cur: if organizacion_regla: # Solo usuarios de esa organización cur.execute(""" SELECT id_usuario, email, rol, organizacion, estado FROM usuarios WHERE email = ANY(%s) AND organizacion = %s AND estado = 'activo' """, (destinatarios_accion, organizacion_regla)) else: # Sistema: cualquier usuario que matchee email cur.execute(""" SELECT id_usuario, email, rol, organizacion, estado FROM usuarios WHERE email = ANY(%s) AND estado = 'activo' """, (destinatarios_accion,)) for row in cur.fetchall(): usuarios.append(Usuario( id_usuario=str(row[0]), email=row[1], rol=row[2], organizacion=row[3], estado=row[4] )) return usuarios def _incrementar_contador_regla(self, id_regla: str) -> None: with self.conn.cursor() as cur: cur.execute(""" UPDATE reglas_notificacion SET disparos_ultimas_24h = CASE WHEN fecha_reset_contador < NOW() - INTERVAL '24 hours' THEN 1 ELSE disparos_ultimas_24h + 1 END, fecha_reset_contador = CASE WHEN fecha_reset_contador < NOW() - INTERVAL '24 hours' THEN NOW() ELSE fecha_reset_contador END, -- Auto-suspender si supera el límite suspendida_hasta = CASE WHEN disparos_ultimas_24h + 1 >= max_disparos_por_dia THEN NOW() + INTERVAL '24 hours' ELSE suspendida_hasta END WHERE id_regla = %s """, (id_regla,)) # -------------------------------------------------------------- # Filtros de envío # -------------------------------------------------------------- def _esta_silenciado(self, usuario, evento, proceso, canal) -> bool: """Comprueba si el usuario ha silenciado este tipo de notificación.""" with self.conn.cursor() as cur: cur.execute(""" SELECT 1 FROM silenciados WHERE id_usuario = %s AND (fecha_caducidad IS NULL OR fecha_caducidad > NOW()) AND ( (id_proceso = %s) OR (fase = %s) OR (modificador = %s) OR (categoria = %s) OR (canal = %s) ) LIMIT 1 """, ( usuario.id_usuario, evento.id_proceso, evento.fase, evento.modificador, proceso.categoria_tematica, canal )) return cur.fetchone() is not None def _pasa_antispam(self, usuario, evento, canal) -> bool: """Comprueba que el usuario no esté recibiendo demasiadas notificaciones del mismo tipo en poco tiempo.""" # Solo aplica a inmediato y push (el diario es 1 al día) if canal not in ('email_inmediato', 'push_web', 'sms'): return True with self.conn.cursor() as cur: # Contar notificaciones del mismo canal en la última hora cur.execute(""" SELECT COUNT(*) FROM notificaciones_enviadas WHERE id_usuario = %s AND canal = %s AND fecha_creacion > NOW() - INTERVAL '1 hour' AND estado IN ('enviada', 'pendiente') """, (usuario.id_usuario, canal)) count_hora = cur.fetchone()[0] cur.execute(""" SELECT max_inmediatos_por_hora, max_inmediatos_por_dia FROM preferencias_notificacion WHERE id_usuario = %s """, (usuario.id_usuario,)) row = cur.fetchone() limite_hora = row[0] if row else self.LIMITE_ANTISPAM_HORA limite_dia = row[1] if row else self.LIMITE_ANTISPAM_DIA if count_hora >= limite_hora: return False cur.execute(""" SELECT COUNT(*) FROM notificaciones_enviadas WHERE id_usuario = %s AND canal = %s AND fecha_creacion > NOW() - INTERVAL '24 hours' AND estado IN ('enviada', 'pendiente') """, (usuario.id_usuario, canal)) count_dia = cur.fetchone()[0] if count_dia >= limite_dia: return False return True def _ya_notificado(self, usuario, evento, canal) -> bool: """Idempotencia: ¿ya hemos creado esta notificación?""" with self.conn.cursor() as cur: cur.execute(""" SELECT 1 FROM notificaciones_enviadas WHERE id_usuario = %s AND id_evento_notif = %s AND canal = %s LIMIT 1 """, (usuario.id_usuario, evento.id_evento_notif, canal)) return cur.fetchone() is not None # -------------------------------------------------------------- # Persistencia # -------------------------------------------------------------- def _persistir_notificaciones(self, notificaciones: list) -> None: if not notificaciones: return with self.conn.cursor() as cur: datos = [ (n.id_evento_notif, n.id_usuario, n.id_regla_origen, n.canal, 'pendiente') for n in notificaciones ] cur.executemany(""" INSERT INTO notificaciones_enviadas (id_evento_notif, id_usuario, id_regla, canal, estado) VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING """, datos) self.conn.commit() def _marcar_evento_procesado(self, id_evento_notif: str) -> None: with self.conn.cursor() as cur: cur.execute(""" UPDATE eventos_notificacion SET estado = 'procesado', fecha_procesamiento = NOW() WHERE id_evento_notif = %s """, (id_evento_notif,)) self.conn.commit() # ======================================================================== # Tests unitarios del evaluador (no requieren BD) # ======================================================================== def _tests(): """Tests del evaluador de condiciones. Ejecutar con: python 11_motor_reglas.py --test """ print("Ejecutando tests del EvaluadorCondiciones...") e = EvaluadorCondiciones() # Tests básicos contexto = { 'fase': 'bases', 'administracion': 'ayto_madrid', 'cuerpo': 'tag', 'num_plazas_total': 15, 'datos': {'porcentaje_cambio_temario': 25.0}, 'turno': 'libre', } casos = [ # (descripción, condición, resultado_esperado) ("Match exacto", {'fase': 'bases'}, True), ("No match", {'fase': 'oep'}, False), ("AND ambas se cumplen", {'all': [{'fase': 'bases'}, {'cuerpo': 'tag'}]}, True), ("AND una falla", {'all': [{'fase': 'bases'}, {'cuerpo': 'otro'}]}, False), ("OR una se cumple", {'any': [{'cuerpo': 'otro'}, {'fase': 'bases'}]}, True), ("OR ninguna", {'any': [{'cuerpo': 'otro'}, {'fase': 'oep'}]}, False), ("NOT positivo", {'not': {'fase': 'oep'}}, True), ("NOT negativo", {'not': {'fase': 'bases'}}, False), ("Operador IN", {'administracion': {'in': ['ayto_madrid', 'ayto_barcelona']}}, True), ("Operador IN no match", {'administracion': {'in': ['ayto_sevilla', 'ayto_valencia']}}, False), ("Operador GT", {'num_plazas_total': {'gt': 10}}, True), ("Operador GT no match", {'num_plazas_total': {'gt': 100}}, False), ("Operador BETWEEN", {'num_plazas_total': {'between': [10, 20]}}, True), ("Notación punto", {'datos.porcentaje_cambio_temario': {'gt': 20}}, True), ("Notación punto valor no existe", {'datos.no_existe': 5}, False), ("Combinación compleja", {'all': [ {'fase': 'bases'}, {'any': [ {'cuerpo': 'tag'}, {'cuerpo': 'auxiliar_admin'} ]}, {'datos.porcentaje_cambio_temario': {'gt': 20}} ]}, True), ("Operador REGEX", {'administracion_nombre': {'regex': '.*'}}, False), # no está en contexto ("Operador CONTAINS en string", {'cuerpo': {'contains': 'ag'}}, True), ] errores = 0 for descripcion, condicion, esperado in casos: try: resultado = e.evaluar(condicion, contexto) if resultado == esperado: print(f" ✓ {descripcion}") else: print(f" ✗ {descripcion}: esperado={esperado}, obtenido={resultado}") errores += 1 except Exception as ex: print(f" ✗ {descripcion}: EXCEPCIÓN {type(ex).__name__}: {ex}") errores += 1 print(f"\n{'OK' if errores == 0 else f'{errores} FALLOS'} — {len(casos)} tests") return errores == 0 # ======================================================================== # Ejemplos de reglas en YAML # ======================================================================== EJEMPLO_REGLAS_YAML = """ # Ejemplo de reglas avanzadas que un admin de academia podría crear. # Estas se guardan en la tabla reglas_notificacion con el contenido # del campo `condicion` y `accion` como JSONB. - nombre: "Bases TAG Madrid → equipo de contenidos" descripcion: "Cuando salgan bases nuevas de TAG en el Ayto. de Madrid" condicion: all: - fase: bases - administracion: ayto_madrid - cuerpo: tag accion: canales: [email_inmediato, dashboard_alerta] destinatarios: - contenidos@academia.es - jefe.contenidos@academia.es asunto_template: "🟢 Bases TAG Madrid - revisar temario" - nombre: "Diff temario >20% → reescribir materiales" descripcion: "Si el temario cambia más de un 20%, alertar a contenidos" condicion: all: - tipo_evento: diff_temario_calculado - datos.porcentaje_cambio: { gt: 20 } accion: canales: [email_inmediato] destinatarios: [coordinacion.contenidos@academia.es] asunto_template: "⚠️ Cambios mayores en temario {cuerpo_nombre}" - nombre: "OEP en CCAA estratégica" descripcion: "Nueva OEP en una autonomía donde la academia opera" condicion: all: - fase: oep - administracion: in: [castilla_y_leon, c_valenciana, andalucia, madrid_ccaa] accion: canales: [email_inmediato] destinatarios: [direccion@academia.es, comercial@academia.es] asunto_template: "💡 Oportunidad: nueva OEP {administracion_nombre}" - nombre: "Convocatorias grandes (>200 plazas)" descripcion: "Convocatorias estatales grandes para captación masiva" condicion: all: - fase: bases - num_plazas_total: { gt: 200 } accion: canales: [email_inmediato, dashboard_alerta] destinatarios: [comercial@academia.es, marketing@academia.es] """ # ======================================================================== # CLI # ======================================================================== if __name__ == '__main__': import sys if '--test' in sys.argv: ok = _tests() sys.exit(0 if ok else 1) elif '--ejemplo-reglas' in sys.argv: print(EJEMPLO_REGLAS_YAML) else: print("Motor de reglas — Buscador Multisearch") print("") print("Uso:") print(" python 11_motor_reglas.py --test # ejecutar tests") print(" python 11_motor_reglas.py --ejemplo-reglas # imprimir reglas ejemplo") print("") print("Uso en código:") print(" motor = MotorReglas(conn)") print(" notificaciones = motor.procesar_evento(id_evento_notif='...')")