Dashboard / Admin / Name Servers / Documentație Query Logs
Pentru a obține statistici DNS precise per-domeniu în timp real, este necesar să configurați query logging pe fiecare nameserver. Există două metode disponibile:
Script Python care parsează log-urile și trimite automat statisticile la Panel prin API. Nu necesită web server suplimentar.
Panel-ul se conectează direct prin SSH și citește log-urile. Necesită chei SSH configurate.
Script PHP cu web server (lighttpd) care expune log-urile prin HTTP. Panel-ul face pull la interval regular.
Pentru această metodă, veți avea nevoie de cheia API configurată în Panel:
Editați fișierul de configurare PowerDNS și activați logging-ul pentru query-uri:
sudo nano /etc/powerdns/pdns.conf
# Adăugați sau modificați:
query-logging=yes
loglevel=6
log-dns-queries=yes
Reporniți PowerDNS și verificați:
sudo systemctl restart pdns
tail -f /var/log/syslog | grep "pdns.*wants"
# Ubuntu/Debian
sudo apt update
sudo apt install python3 python3-requests -y
# Sau cu pip
sudo apt install python3 python3-pip -y
pip3 install requests
Creați directorul și fișierul script:
sudo mkdir -p /opt/dns-stats
sudo nano /opt/dns-stats/parse-and-send.py
Copiați următorul conținut în fișier:
#!/usr/bin/env python3
"""
DNS Query Statistics Parser & Sender
Parsează log-urile PowerDNS și trimite statisticile la Panel
"""
import re
import subprocess
import requests
import socket
from datetime import datetime
from collections import defaultdict
import json
import os
# ============ CONFIGURAȚIE ============
PANEL_URL = "{{ config('app.url') }}/api/dns-stats/receive"
API_KEY = "{{ config('services.dns_stats.receiver_key', 'YOUR_API_KEY_HERE') }}"
LOG_FILE = "/var/log/syslog"
LINES_TO_READ = 10000 # Câte linii să citească din log
STATE_FILE = "/opt/dns-stats/.last_position"
# ======================================
def get_nameserver_name():
"""Obține numele nameserver-ului din hostname"""
hostname = socket.gethostname()
# Încearcă să găsească ns1, ns2, etc în hostname
match = re.search(r'ns\d+', hostname.lower())
if match:
return match.group()
return hostname
def read_new_log_lines():
"""Citește ultimele linii din log folosind tail"""
try:
result = subprocess.run(
['tail', '-n', str(LINES_TO_READ), LOG_FILE],
capture_output=True, text=True, timeout=30
)
return result.stdout.splitlines()
except Exception as e:
print(f"Eroare la citirea log-ului: {e}")
return []
def parse_dns_queries(lines):
"""
Parsează liniile de log și extrage statisticile per domeniu
Exemplu linie: "Dec 19 10:15:23 ns1 pdns[1234]: Remote 1.2.3.4 wants 'example.com|A'"
"""
stats = defaultdict(lambda: {'total': 0, 'types': defaultdict(int)})
# Pattern pentru query-uri PowerDNS
pattern = re.compile(
r"pdns\[\d+\]:\s+.*?Remote\s+[\d\.]+(?::\d+)?\s+wants\s+'([^|]+)\|([A-Z]+)",
re.IGNORECASE
)
for line in lines:
if 'pdns' not in line or 'wants' not in line:
continue
match = pattern.search(line)
if match:
domain = match.group(1).lower().rstrip('.')
qtype = match.group(2).upper()
# Normalizează domeniul (elimină subdomenii pentru statistici)
# ex: www.example.com -> example.com
parts = domain.split('.')
if len(parts) > 2:
# Păstrează ultimele 2 părți pentru TLD-uri simple
# sau ultimele 3 pentru TLD-uri compuse (.co.uk, etc)
if parts[-2] in ['co', 'com', 'org', 'net', 'gov', 'edu']:
root_domain = '.'.join(parts[-3:])
else:
root_domain = '.'.join(parts[-2:])
else:
root_domain = domain
stats[root_domain]['total'] += 1
stats[root_domain]['types'][qtype] += 1
return dict(stats)
def send_to_panel(stats):
"""Trimite statisticile la Panel prin API"""
if not stats:
print("Nu sunt statistici de trimis")
return True
nameserver = get_nameserver_name()
payload = {
'api_key': API_KEY,
'nameserver': nameserver,
'timestamp': datetime.now().isoformat(),
'stats': stats
}
try:
response = requests.post(
PANEL_URL,
json=payload,
headers={'Content-Type': 'application/json'},
timeout=30
)
if response.status_code == 200:
result = response.json()
print(f"✓ Trimis: {len(stats)} domenii, {result.get('processed', 0)} procesate")
return True
else:
print(f"✗ Eroare HTTP {response.status_code}: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"✗ Eroare conexiune: {e}")
return False
def main():
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Pornire colectare statistici...")
# Citește log-urile
lines = read_new_log_lines()
print(f" Citite {len(lines)} linii din log")
# Parsează query-urile
stats = parse_dns_queries(lines)
print(f" Găsite {len(stats)} domenii unice")
# Trimite la Panel
if stats:
send_to_panel(stats)
else:
print(" Nu sunt query-uri DNS în ultimele log-uri")
if __name__ == '__main__':
main()
Faceți scriptul executabil:
sudo chmod +x /opt/dns-stats/parse-and-send.py
Rulați scriptul manual pentru a verifica că funcționează:
sudo python3 /opt/dns-stats/parse-and-send.py
Output așteptat:
[2024-12-19 10:30:45] Pornire colectare statistici... Citite 10000 linii din log Găsite 45 domenii unice ✓ Trimis: 45 domenii, 45 procesate
Adăugați un cron job pentru a rula scriptul la fiecare 5 minute:
sudo crontab -e
# Adăugați linia:
*/5 * * * * /usr/bin/python3 /opt/dns-stats/parse-and-send.py >> /var/log/dns-stats.log 2>&1
Opțional: Puteți ajusta frecvența la */1 pentru actualizări la minut
sau */15 pentru actualizări la 15 minute.
După câteva minute, statisticile ar trebui să apară în:
Verificați că URL-ul Panel-ului este accesibil din VPS: curl -I {{ config('app.url') }}
Cheia API din script nu corespunde cu cea din Panel. Verificați valoarea DNS_STATS_RECEIVER_KEY din .env
Verificați că query logging este activat: grep "wants" /var/log/syslog | head -5
Dacă nu vedeți "wants" în log-uri, verificați configurația PowerDNS și reporniți serviciul.
tail -f /var/log/dns-stats.log