// mqtt/client.js const mqtt = require('mqtt'); const EventEmitter = require('events'); const db = require('../db'); const { sendPushToUser } = require('../utils/pushService'); require('dotenv').config(); const emitter = new EventEmitter(); const MQTT_URL = process.env.MQTT_URL || 'mqtt://localhost:1883'; const mqttUser = process.env.MQTT_USER || 'admin'; const mqttPass = process.env.MQTT_PASS || '123QWEasd'; const client = mqtt.connect(MQTT_URL, { username: mqttUser, password: mqttPass, reconnectPeriod: 2000, }); // -------------------- // Helpers // -------------------- const lastEnabled = {}; // por chargerId function getStatusFromStateCode(code) { const map = { A1: '🔌 Not Conn.', B1: '🟡 Unauth.', B2: '🟢 Ready', C1: '⚡ Wait', C2: '⚡ Charging', D1: '💨 Vent (req)', D2: '💨 Vent', E: '❌ CP Error', F: '⚠️ Fault', }; return map[code] || '❓ Unknown'; } function getTriple(arr) { return Array.isArray(arr) ? arr.slice(0, 3).map((n) => Math.round((Number(n) || 0) * 10) / 10) : [0, 0, 0]; } function toOneDecimal(v) { const n = Number(v); if (!Number.isFinite(n)) return 0; return Math.round(n * 10) / 10; } function shallowEqual(a, b) { if (a === b) return true; if (!a || !b) return false; const ak = Object.keys(a); const bk = Object.keys(b); if (ak.length !== bk.length) return false; for (const k of ak) { const av = a[k]; const bv = b[k]; if (Array.isArray(av) || Array.isArray(bv)) { if (!Array.isArray(av) || !Array.isArray(bv)) return false; if (av.length !== bv.length) return false; for (let i = 0; i < av.length; i++) { if (av[i] !== bv[i]) return false; } } else if (av !== bv) { return false; } } return true; } // -------------------- // Caches // -------------------- // último estado normalizado por chargerId const lastStateByChargerId = new Map(); // chargerId -> normalizedState // cache charger por mqttTopic (evita SELECT a cada msg) const chargerCache = new Map(); // mqttTopic -> { charger, fetchedAt } const CHARGER_CACHE_TTL_MS = Number(process.env.CHARGER_CACHE_TTL_MS || 30000); async function getChargerByMqttTopic(mqttTopic) { const cached = chargerCache.get(mqttTopic); const now = Date.now(); if (cached && now - cached.fetchedAt < CHARGER_CACHE_TTL_MS) { return cached.charger; } const charger = await db('chargers').where({ mqtt_topic: mqttTopic }).first(); if (charger) { chargerCache.set(mqttTopic, { charger, fetchedAt: now }); } return charger; } // -------------------- // Subscribe // -------------------- client.on('connect', () => { console.log('[MQTT] Conectado ao broker:', MQTT_URL); // ✅ tópicos fixos que TU pediste manter const fixedTopics = ['+/state', '+/response/config/evse']; // opcional via env (mas não substitui os fixos) const envTopics = (process.env.MQTT_SUB_TOPICS || '') .split(',') .map((s) => s.trim()) .filter(Boolean); const topicsToSub = [...new Set([...fixedTopics, ...envTopics])]; topicsToSub.forEach((t) => { client.subscribe(t, { qos: 0 }, (err, granted) => { if (err) { console.error('[MQTT] Falha ao subscrever', t, err.message); } else { console.log('[MQTT] Subscrito:', granted?.map((g) => g.topic).join(', ') || t); } }); }); }); // -------------------- // Messages // -------------------- client.on('message', async (topic, message) => { // LOG para garantir que está a consumir console.log('[MQTT] msg recebida em:', topic); const parts = topic.split('/'); const mqttTopic = parts[0]; // ex: bf92842c365a const subtopic = parts.slice(1).join('/'); // ex: state ou response/config/evse // -------------------- // STATE // -------------------- if (subtopic === 'state') { try { const payload = JSON.parse(message.toString()); const charger = await getChargerByMqttTopic(mqttTopic); if (!charger) { console.warn(`[MQTT] Charger não encontrado para topic: ${mqttTopic}`); return; } const chargerId = charger.id; const now = new Date(); const stateCode = String(payload.state || '').split(' ')[0]; if (!stateCode) { console.warn(`[MQTT] Estado ausente/inválido para charger ID ${chargerId}`); return; } const status = getStatusFromStateCode(stateCode); const [p1, p2, p3] = getTriple(payload.power); const [v1, v2, v3] = getTriple(payload.voltage); const [c1, c2, c3] = getTriple(payload.current); const consumption = toOneDecimal(payload.consumption); const chargingTime = toOneDecimal( payload.chargingTime ?? payload.sessionTime ); const normalizedState = { status, charging_current: c1, consumption, charging_time: chargingTime, power_l1: p1, power_l2: p2, power_l3: p3, voltage_l1: v1, voltage_l2: v2, voltage_l3: v3, current_l1: c1, current_l2: c2, current_l3: c3, }; const prevState = lastStateByChargerId.get(chargerId); const changed = !prevState || !shallowEqual(prevState, normalizedState); if (changed) { await db('chargers') .where({ id: chargerId }) .update({ ...normalizedState, updated_at: now.toISOString(), }); lastStateByChargerId.set(chargerId, normalizedState); console.log(`[DB] Estado atualizado para charger ID ${chargerId}`); } else { // só para debug — podes remover depois console.log(`[MQTT] Estado repetido, sem write (charger ${chargerId})`); } // sessões start/stop (mantido) const previouslyEnabled = lastEnabled[chargerId] || false; const currentlyEnabled = stateCode === 'C2'; if (!previouslyEnabled && currentlyEnabled) { const activeSession = await db('charger_sessions') .where({ charger_id: chargerId }) .whereNull('ended_at') .first(); if (!activeSession) { await db('charger_sessions').insert({ charger_id: chargerId, started_at: now, kwh: consumption, }); console.log(`[DB] Sessão iniciada para charger ID ${chargerId}`); } } if (previouslyEnabled && !currentlyEnabled) { const session = await db('charger_sessions') .where({ charger_id: chargerId }) .whereNull('ended_at') .first(); if (session) { await db('charger_sessions') .where({ id: session.id }) .update({ ended_at: now, kwh: consumption }); console.log(`[DB] Sessão finalizada para charger ID ${chargerId}`); } } lastEnabled[chargerId] = currentlyEnabled; // emit para socket emitter.emit('charging-status', { charger_id: chargerId, mqtt_topic: mqttTopic, status, stateCode, consumption, chargingTime, power: [p1, p2, p3], voltage: [v1, v2, v3], current: [c1, c2, c3], raw: payload, }); // push (mantido) if (status === '⚠️ Fault' || status === '❌ CP Error') { await sendPushToUser(charger.user_id, { title: '⚠️ Erro no carregador', body: `${charger.location || 'Carregador'} entrou em falha.`, url: `/charger/${charger.id}`, }); } if (previouslyEnabled && !currentlyEnabled) { await sendPushToUser(charger.user_id, { title: '✅ Carregamento concluído', body: `${charger.location || 'Carregador'} terminou o carregamento.`, url: `/history`, }); } } catch (err) { console.error(`[MQTT] Erro ao processar state '${mqttTopic}':`, err); console.error('Payload recebido:', message.toString()); } } // -------------------- // CONFIG RESPONSE // -------------------- else if (subtopic === 'response/config/evse') { try { const payload = JSON.parse(message.toString()); const charger = await getChargerByMqttTopic(mqttTopic); if (!charger) { console.warn(`[MQTT] Charger não encontrado para topic: ${mqttTopic}`); return; } const configData = { charger_id: charger.id, max_charging_current: payload.maxChargingCurrent || 32, require_auth: !!payload.requireAuth, rcm_enabled: !!payload.rcm, temperature_limit: payload.temperatureThreshold || 60, config_received_at: new Date().toISOString(), }; const existingConfig = await db('charger_configs') .where({ charger_id: charger.id }) .first(); const prevCfgNorm = existingConfig ? { max_charging_current: Number(existingConfig.max_charging_current) || 32, require_auth: !!existingConfig.require_auth, rcm_enabled: !!existingConfig.rcm_enabled, temperature_limit: Number(existingConfig.temperature_limit) || 60, } : null; const nextCfgNorm = { max_charging_current: Number(configData.max_charging_current) || 32, require_auth: !!configData.require_auth, rcm_enabled: !!configData.rcm_enabled, temperature_limit: Number(configData.temperature_limit) || 60, }; const cfgChanged = !prevCfgNorm || !shallowEqual(prevCfgNorm, nextCfgNorm); if (cfgChanged) { if (existingConfig) { await db('charger_configs') .where({ charger_id: charger.id }) .update(configData); console.log(`[DB] Configuração atualizada para charger ID ${charger.id}`); } else { await db('charger_configs').insert(configData); console.log(`[DB] Nova configuração inserida para charger ID ${charger.id}`); } } else { console.log(`[MQTT] Config repetida, sem write (charger ${charger.id})`); } emitter.emit('charging-config', { ...configData, mqtt_topic: mqttTopic, raw: payload, }); } catch (err) { console.error(`[MQTT] Erro ao processar config '${mqttTopic}':`, err); console.error('Payload recebido:', message.toString()); } } }); // -------------------- // Broker offline / checker (igual ao teu) // -------------------- client.on('offline', async () => { console.warn('[MQTT] Broker offline'); try { const chargers = await db('chargers').select('id', 'user_id', 'location'); const uniqueUsers = [...new Set(chargers.map((c) => c.user_id))]; await Promise.allSettled( uniqueUsers.map((userId) => sendPushToUser(userId, { title: '📡 Broker MQTT offline', body: 'O sistema perdeu ligação ao broker. Alguns estados podem estar desatualizados.', url: '/', }) ) ); } catch (err) { console.error('[MQTT] erro offline push:', err.message); } }); setInterval(async () => { try { const timeoutMinutes = Number(process.env.CHARGER_OFFLINE_MINUTES || 5); const limitDate = new Date(Date.now() - timeoutMinutes * 60 * 1000); const offlineChargers = await db('chargers') .where('updated_at', '<', limitDate.toISOString()) .andWhereNot({ status: 'offline' }) .select('*'); for (const ch of offlineChargers) { await db('chargers') .where({ id: ch.id }) .update({ status: 'offline' }); lastStateByChargerId.delete(ch.id); lastEnabled[ch.id] = false; await sendPushToUser(ch.user_id, { title: '🔌 Carregador offline', body: `${ch.location || 'Carregador'} está offline há mais de ${timeoutMinutes} min.`, url: `/charger/${ch.id}`, }); } } catch (err) { console.error('[MQTT] offline checker erro:', err.message); } }, 60 * 1000); // -------------------- // API pública // -------------------- function on(event, handler) { emitter.on(event, handler); } function sendConfig(chargerTopic, property, value) { const payload = { [property]: value }; client.publish(`${chargerTopic}/set/config/evse`, JSON.stringify(payload), { qos: 1, }); } function sendEnable(chargerTopic, enable) { client.publish( `${chargerTopic}/enable`, JSON.stringify({ enable: !!enable }), { qos: 1 } ); } function requestConfig(chargerTopic) { client.publish(`${chargerTopic}/request/config/evse`, null, { qos: 1 }); } module.exports = { on, sendConfig, sendEnable, requestConfig, };