Сделал парсер, начал интеграцию с бд
This commit is contained in:
0
dbapp/lyngsatapp/__init__.py
Normal file
0
dbapp/lyngsatapp/__init__.py
Normal file
8
dbapp/lyngsatapp/admin.py
Normal file
8
dbapp/lyngsatapp/admin.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from django.contrib import admin
|
||||
from .models import LyngSat
|
||||
|
||||
@admin.register(LyngSat)
|
||||
class LyngSatAdmin(admin.ModelAdmin):
|
||||
list_display = ("mark", "timestamp")
|
||||
search_fields = ("mark", )
|
||||
ordering = ("timestamp",)
|
||||
6
dbapp/lyngsatapp/apps.py
Normal file
6
dbapp/lyngsatapp/apps.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class LyngsatappConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'lyngsatapp'
|
||||
0
dbapp/lyngsatapp/migrations/__init__.py
Normal file
0
dbapp/lyngsatapp/migrations/__init__.py
Normal file
36
dbapp/lyngsatapp/models.py
Normal file
36
dbapp/lyngsatapp/models.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from django.db import models
|
||||
from mainapp.models import (
|
||||
Satellite,
|
||||
Polarization,
|
||||
Modulation,
|
||||
Standard,
|
||||
get_default_polarization,
|
||||
get_default_modulation,
|
||||
get_default_standard
|
||||
)
|
||||
|
||||
class LyngSat(models.Model):
|
||||
id_satellite = models.ForeignKey(Satellite, on_delete=models.PROTECT, related_name="lyngsat", verbose_name="Спутник", null=True)
|
||||
polarization = models.ForeignKey(
|
||||
Polarization, default=get_default_polarization, on_delete=models.SET_DEFAULT, related_name="lyngsat", null=True, blank=True, verbose_name="Поляризация"
|
||||
)
|
||||
modulation = models.ForeignKey(
|
||||
Modulation, default=get_default_modulation, on_delete=models.SET_DEFAULT, related_name="lyngsat", null=True, blank=True, verbose_name="Модуляция"
|
||||
)
|
||||
standard = models.ForeignKey(
|
||||
Standard, default=get_default_standard, on_delete=models.SET_DEFAULT, related_name="lyngsat", null=True, blank=True, verbose_name="Стандарт"
|
||||
)
|
||||
frequency = models.FloatField(default=0, null=True, blank=True, verbose_name="Частота, МГц")
|
||||
sym_velocity = models.FloatField(default=0, null=True, blank=True, verbose_name="Символьная скорость, БОД")
|
||||
last_update = models.DateTimeField(null=True, blank=True, verbose_name="Время")
|
||||
channel_info = models.CharField(max_length=20, blank=True, null=True, verbose_name="Описание источника")
|
||||
# url = models.URLField(max_length = 200, blank=True, null=True, verbose_name="Ссылка на страницу")
|
||||
|
||||
def __str__(self):
|
||||
return f"Ист {self.frequency}, {self.polarization}"
|
||||
|
||||
class Meta:
|
||||
verbose_name = "Источник LyngSat"
|
||||
verbose_name_plural = "Источники LyngSat"
|
||||
|
||||
|
||||
371
dbapp/lyngsatapp/parser.py
Normal file
371
dbapp/lyngsatapp/parser.py
Normal file
@@ -0,0 +1,371 @@
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import datetime
|
||||
import re
|
||||
import time
|
||||
|
||||
class LyngSatParser:
|
||||
"""Парсер данных для LyngSat(Для работы нужен flaresolver)"""
|
||||
def __init__(
|
||||
self,
|
||||
flaresolver_url: str = "http://localhost:8191/v1",
|
||||
regions: list[str] | None = None,
|
||||
target_sats: list[str] | None = None,
|
||||
):
|
||||
self.flaresolver_url = flaresolver_url
|
||||
self.regions = regions
|
||||
self.target_sats = list(map(lambda sat: sat.strip().lower(), target_sats)) if regions else None
|
||||
self.regions = regions if regions else ["europe", "asia", "america", "atlantic"]
|
||||
self.BASE_URL = "https://www.lyngsat.com"
|
||||
|
||||
def parse_metadata(self, metadata: str) -> dict:
|
||||
if not metadata or not metadata.strip():
|
||||
return {
|
||||
'standard': None,
|
||||
'modulation': None,
|
||||
'symbol_rate': None,
|
||||
'fec': None
|
||||
}
|
||||
normalized = re.sub(r'\s+', '', metadata.strip())
|
||||
fec_match = re.search(r'([1-9]/[1-9])$', normalized)
|
||||
fec = fec_match.group(1) if fec_match else None
|
||||
if fec_match:
|
||||
core = normalized[:fec_match.start()]
|
||||
else:
|
||||
core = normalized
|
||||
std_match = re.match(r'(DVB-S2?|ABS-S|DVB-T2?|ATSC|ISDB)', core)
|
||||
standard = std_match.group(1) if std_match else None
|
||||
rest = core[len(standard):] if standard else core
|
||||
modulation = None
|
||||
mod_match = re.match(r'(8PSK|QPSK|16APSK|32APSK|64QAM|256QAM|BPSK)', rest)
|
||||
if mod_match:
|
||||
modulation = mod_match.group(1)
|
||||
rest = rest[len(modulation):]
|
||||
symbol_rate = None
|
||||
sr_match = re.search(r'(\d+)$', rest)
|
||||
if sr_match:
|
||||
try:
|
||||
symbol_rate = int(sr_match.group(1))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return {
|
||||
'standard': standard,
|
||||
'modulation': modulation,
|
||||
'symbol_rate': symbol_rate,
|
||||
'fec': fec
|
||||
}
|
||||
|
||||
def extract_date(self, s: str) -> datetime | None:
|
||||
s = s.strip()
|
||||
match = re.search(r'(\d{6})$', s)
|
||||
if not match:
|
||||
return None
|
||||
yymmdd = match.group(1)
|
||||
try:
|
||||
return datetime.strptime(yymmdd, '%y%m%d').date()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def convert_polarization(self, polarization: str) -> str:
|
||||
"""Преобразовать код поляризации в понятное название на русском"""
|
||||
polarization_map = {
|
||||
'V': 'Вертикальная',
|
||||
'H': 'Горизонтальная',
|
||||
'R': 'Правая',
|
||||
'L': 'Левая'
|
||||
}
|
||||
return polarization_map.get(polarization.upper(), polarization)
|
||||
|
||||
def get_region_pages(self) -> list[str]:
|
||||
html_regions = []
|
||||
for region in self.regions:
|
||||
url = f"{self.BASE_URL}/{region}.html"
|
||||
payload = {
|
||||
"cmd": "request.get",
|
||||
"url": url,
|
||||
"maxTimeout": 60000
|
||||
}
|
||||
response = requests.post(self.flaresolver_url, json=payload)
|
||||
if response.status_code != 200:
|
||||
continue
|
||||
html_content = response.json().get("solution", {}).get("response", "")
|
||||
html_regions.append(html_content)
|
||||
print(f"Обработал страницу по {region}")
|
||||
return html_regions
|
||||
|
||||
def get_satellites_data(self) -> dict[dict]:
|
||||
sat_data = {}
|
||||
for region_page in self.get_region_pages():
|
||||
soup = BeautifulSoup(region_page, "html.parser")
|
||||
|
||||
col_table = soup.find_all("div", class_="desktab")[0]
|
||||
|
||||
tables = col_table.find_next_sibling('table').find_all('table')
|
||||
trs = []
|
||||
for table in tables:
|
||||
trs.extend(table.find_all('tr'))
|
||||
for tr in trs:
|
||||
sat_name = tr.find('span').text
|
||||
if self.target_sats is not None:
|
||||
if sat_name.strip().lower() not in self.target_sats:
|
||||
continue
|
||||
try:
|
||||
sat_url = tr.find_all('a')[2]['href']
|
||||
except IndexError:
|
||||
sat_url = tr.find_all('a')[0]['href']
|
||||
|
||||
update_date = tr.find_all('td')[-1].text
|
||||
sat_response = requests.post(self.flaresolver_url, json={
|
||||
"cmd": "request.get",
|
||||
"url": f"{self.BASE_URL}/{sat_url}",
|
||||
"maxTimeout": 60000
|
||||
})
|
||||
html_content = sat_response.json().get("solution", {}).get("response", "")
|
||||
sat_page_data = self.get_satellite_content(html_content)
|
||||
sat_data[sat_name] = {
|
||||
"url": f"{self.BASE_URL}/{sat_url}",
|
||||
"update_date": datetime.strptime(update_date, "%y%m%d").date(),
|
||||
"sources": sat_page_data
|
||||
}
|
||||
return sat_data
|
||||
|
||||
def get_satellite_content(self, html_content: str) -> dict:
|
||||
sat_soup = BeautifulSoup(html_content, "html.parser")
|
||||
big_table = sat_soup.find('table', class_='bigtable')
|
||||
all_tables = big_table.find_all("div", class_="desktab")[:-1]
|
||||
data = []
|
||||
for table in all_tables:
|
||||
trs = table.find_next_sibling('table').find_all('tr')
|
||||
for idx, tr in enumerate(trs):
|
||||
tds = tr.find_all('td')
|
||||
if len(tds) < 9 or idx < 2:
|
||||
continue
|
||||
freq, polarization = tds[0].find('b').text.strip().split('\xa0')
|
||||
polarization = self.convert_polarization(polarization)
|
||||
meta = self.parse_metadata(tds[1].text)
|
||||
provider_name = tds[3].text
|
||||
last_update = self.extract_date(tds[-1].text)
|
||||
data.append({
|
||||
"freq": freq,
|
||||
"pol": polarization,
|
||||
"metadata": meta,
|
||||
"provider_name": provider_name,
|
||||
"last_update": last_update
|
||||
})
|
||||
return data
|
||||
|
||||
|
||||
class KingOfSatParser:
|
||||
def __init__(self, base_url="https://ru.kingofsat.net", max_satellites=0):
|
||||
"""
|
||||
Инициализация парсера
|
||||
:param base_url: Базовый URL сайта
|
||||
:param max_satellites: Максимальное количество спутников для парсинга (0 - все)
|
||||
"""
|
||||
self.base_url = base_url
|
||||
self.max_satellites = max_satellites
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||||
})
|
||||
|
||||
def convert_polarization(self, polarization):
|
||||
"""Преобразовать код поляризации в понятное название на русском"""
|
||||
polarization_map = {
|
||||
'V': 'Вертикальная',
|
||||
'H': 'Горизонтальная',
|
||||
'R': 'Правая',
|
||||
'L': 'Левая'
|
||||
}
|
||||
return polarization_map.get(polarization.upper(), polarization)
|
||||
|
||||
def fetch_page(self, url):
|
||||
"""Получить HTML страницу"""
|
||||
try:
|
||||
response = self.session.get(url, timeout=30)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
except Exception as e:
|
||||
print(f"Ошибка при получении страницы {url}: {e}")
|
||||
return None
|
||||
|
||||
def parse_satellite_table(self, html_content):
|
||||
"""Распарсить таблицу со спутниками"""
|
||||
soup = BeautifulSoup(html_content, 'html.parser')
|
||||
satellites = []
|
||||
table = soup.find('table')
|
||||
if not table:
|
||||
print("Таблица не найдена")
|
||||
return satellites
|
||||
|
||||
rows = table.find_all('tr')[1:]
|
||||
|
||||
for row in rows:
|
||||
cols = row.find_all('td')
|
||||
if len(cols) < 13:
|
||||
continue
|
||||
|
||||
try:
|
||||
position_cell = cols[0].text.strip()
|
||||
position_match = re.search(r'([\d\.]+)°([EW])', position_cell)
|
||||
if position_match:
|
||||
position_value = position_match.group(1)
|
||||
position_direction = position_match.group(2)
|
||||
position = f"{position_value}{position_direction}"
|
||||
else:
|
||||
position = None
|
||||
|
||||
# Название спутника (2-я колонка)
|
||||
satellite_cell = cols[1]
|
||||
satellite_name = satellite_cell.get_text(strip=True)
|
||||
# Удаляем возможные лишние символы или пробелы
|
||||
satellite_name = re.sub(r'\s+', ' ', satellite_name).strip()
|
||||
|
||||
# NORAD (3-я колонка)
|
||||
norad = cols[2].text.strip()
|
||||
if not norad or norad == "-":
|
||||
norad = None
|
||||
|
||||
ini_link = None
|
||||
ini_cell = cols[3]
|
||||
ini_img = ini_cell.find('img', src=lambda x: x and 'disquette.gif' in x)
|
||||
if ini_img and position:
|
||||
ini_link = f"https://ru.kingofsat.net/dl.php?pos={position}&fkhz=0"
|
||||
|
||||
update_date = cols[12].text.strip() if len(cols) > 12 else None
|
||||
|
||||
if satellite_name and ini_link and position:
|
||||
satellites.append({
|
||||
'position': position,
|
||||
'name': satellite_name,
|
||||
'norad': norad,
|
||||
'ini_url': ini_link,
|
||||
'update_date': update_date
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"Ошибка при обработке строки таблицы: {e}")
|
||||
continue
|
||||
|
||||
return satellites
|
||||
|
||||
def parse_ini_file(self, ini_content):
|
||||
"""Распарсить содержимое .ini файла"""
|
||||
data = {
|
||||
'metadata': {},
|
||||
'sattype': {},
|
||||
'dvb': {}
|
||||
}
|
||||
|
||||
# # Извлекаем метаданные из комментариев
|
||||
# metadata_match = re.search(r'\[ downloaded from www\.kingofsat\.net \(c\) (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \]', ini_content)
|
||||
# if metadata_match:
|
||||
# data['metadata']['downloaded'] = metadata_match.group(1)
|
||||
|
||||
# Парсим секцию [SATTYPE]
|
||||
sattype_match = re.search(r'\[SATTYPE\](.*?)\n\[', ini_content, re.DOTALL)
|
||||
if sattype_match:
|
||||
sattype_content = sattype_match.group(1).strip()
|
||||
for line in sattype_content.split('\n'):
|
||||
line = line.strip()
|
||||
if '=' in line:
|
||||
key, value = line.split('=', 1)
|
||||
data['sattype'][key.strip()] = value.strip()
|
||||
|
||||
# Парсим секцию [DVB]
|
||||
dvb_match = re.search(r'\[DVB\](.*?)(?:\n\[|$)', ini_content, re.DOTALL)
|
||||
if dvb_match:
|
||||
dvb_content = dvb_match.group(1).strip()
|
||||
for line in dvb_content.split('\n'):
|
||||
line = line.strip()
|
||||
if '=' in line:
|
||||
key, value = line.split('=', 1)
|
||||
params = [p.strip() for p in value.split(',')]
|
||||
polarization = params[1] if len(params) > 1 else ''
|
||||
if polarization:
|
||||
polarization = self.convert_polarization(polarization)
|
||||
|
||||
data['dvb'][key.strip()] = {
|
||||
'frequency': params[0] if len(params) > 0 else '',
|
||||
'polarization': polarization,
|
||||
'symbol_rate': params[2] if len(params) > 2 else '',
|
||||
'fec': params[3] if len(params) > 3 else '',
|
||||
'standard': params[4] if len(params) > 4 else '',
|
||||
'modulation': params[5] if len(params) > 5 else ''
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
def download_ini_file(self, url):
|
||||
"""Скачать содержимое .ini файла"""
|
||||
try:
|
||||
response = self.session.get(url, timeout=30)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
except Exception as e:
|
||||
print(f"Ошибка при скачивании .ini файла {url}: {e}")
|
||||
return None
|
||||
|
||||
def get_all_satellites_data(self):
|
||||
"""Получить данные всех спутников с учетом ограничения max_satellites"""
|
||||
html_content = self.fetch_page(self.base_url + '/satellites')
|
||||
if not html_content:
|
||||
return []
|
||||
|
||||
satellites = self.parse_satellite_table(html_content)
|
||||
|
||||
if self.max_satellites > 0 and len(satellites) > self.max_satellites:
|
||||
satellites = satellites[:self.max_satellites]
|
||||
|
||||
results = []
|
||||
processed_count = 0
|
||||
|
||||
for satellite in satellites:
|
||||
print(f"Обработка спутника: {satellite['name']} ({satellite['position']})")
|
||||
|
||||
ini_content = self.download_ini_file(satellite['ini_url'])
|
||||
if not ini_content:
|
||||
print(f"Не удалось скачать .ini файл для {satellite['name']}")
|
||||
continue
|
||||
|
||||
parsed_ini = self.parse_ini_file(ini_content)
|
||||
|
||||
result = {
|
||||
'satellite_name': satellite['name'],
|
||||
'position': satellite['position'],
|
||||
'norad': satellite['norad'],
|
||||
'update_date': satellite['update_date'],
|
||||
'ini_url': satellite['ini_url'],
|
||||
'ini_data': parsed_ini
|
||||
}
|
||||
|
||||
results.append(result)
|
||||
processed_count += 1
|
||||
|
||||
if self.max_satellites > 0 and processed_count >= self.max_satellites:
|
||||
break
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
return results
|
||||
|
||||
def create_satellite_dict(self, satellites_data):
|
||||
"""Создать словарь с данными спутников"""
|
||||
satellite_dict = {}
|
||||
|
||||
for data in satellites_data:
|
||||
key = f"{data['position']}_{data['satellite_name'].replace(' ', '_').replace('/', '_')}"
|
||||
satellite_dict[key] = {
|
||||
'name': data['satellite_name'],
|
||||
'position': data['position'],
|
||||
'norad': data['norad'],
|
||||
'update_date': data['update_date'],
|
||||
'ini_url': data['ini_url'],
|
||||
'transponders_count': len(data['ini_data']['dvb']),
|
||||
'transponders': data['ini_data']['dvb'],
|
||||
'sattype_info': data['ini_data']['sattype'],
|
||||
'metadata': data['ini_data']['metadata']
|
||||
}
|
||||
|
||||
return satellite_dict
|
||||
3
dbapp/lyngsatapp/tests.py
Normal file
3
dbapp/lyngsatapp/tests.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
||||
3
dbapp/lyngsatapp/views.py
Normal file
3
dbapp/lyngsatapp/views.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from django.shortcuts import render
|
||||
|
||||
# Create your views here.
|
||||
Reference in New Issue
Block a user