Files
dbstorage/dbapp/lyngsatapp/parser.py

438 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from bs4 import BeautifulSoup
from datetime import datetime
import re
import time
def parse_satellite_names(satellite_string: str) -> list[str]:
slash_parts = [part.strip() for part in satellite_string.split('/')]
all_names = []
for part in slash_parts:
main_match = re.match(r'^([^(]+)', part)
if main_match:
main_name = main_match.group(1).strip()
if main_name:
all_names.append(main_name)
bracket_match = re.search(r'\(([^)]+)\)', part)
if bracket_match:
bracket_name = bracket_match.group(1).strip()
if bracket_name:
all_names.append(bracket_name)
seen = set()
result = []
for name in all_names:
if name not in seen:
seen.add(name)
result.append(name.strip().lower())
return result
class LyngSatParser:
"""Парсер данных для LyngSat(Для работы нужен flaresolver)"""
def __init__(
self,
flaresolver_url: str = "http://localhost:8191/v1",
regions: list[str] | None = None,
target_sats: list[str] | None = None,
):
self.flaresolver_url = flaresolver_url
self.regions = regions
self.target_sats = (
list(map(lambda sat: sat.strip().lower(), target_sats)) if regions else None
)
self.regions = regions if regions else ["europe", "asia", "america", "atlantic"]
self.BASE_URL = "https://www.lyngsat.com"
def parse_metadata(self, metadata: str) -> dict:
if not metadata or not metadata.strip():
return {
"standard": None,
"modulation": None,
"symbol_rate": None,
"fec": None,
}
normalized = re.sub(r"\s+", "", metadata.strip())
fec_match = re.search(r"([1-9]/[1-9])$", normalized)
fec = fec_match.group(1) if fec_match else None
if fec_match:
core = normalized[: fec_match.start()]
else:
core = normalized
std_match = re.match(r"(DVB-S2?|ABS-S|DVB-T2?|ATSC|ISDB)", core)
standard = std_match.group(1) if std_match else None
rest = core[len(standard) :] if standard else core
modulation = None
mod_match = re.match(r"(8PSK|QPSK|16APSK|32APSK|64QAM|256QAM|BPSK)", rest)
if mod_match:
modulation = mod_match.group(1)
rest = rest[len(modulation) :]
symbol_rate = None
sr_match = re.search(r"(\d+)$", rest)
if sr_match:
try:
symbol_rate = int(sr_match.group(1))
except ValueError:
pass
return {
"standard": standard,
"modulation": modulation,
"symbol_rate": symbol_rate,
"fec": fec,
}
def extract_date(self, s: str) -> datetime | None:
s = s.strip()
match = re.search(r"(\d{6})$", s)
if not match:
return None
yymmdd = match.group(1)
try:
return datetime.strptime(yymmdd, "%y%m%d").date()
except ValueError:
return None
def convert_polarization(self, polarization: str) -> str:
"""Преобразовать код поляризации в понятное название на русском"""
polarization_map = {
"V": "Вертикальная",
"H": "Горизонтальная",
"R": "Правая",
"L": "Левая",
}
return polarization_map.get(polarization.upper(), polarization)
def get_region_pages(self, regions: list[str] | None = None) -> list[str]:
html_regions = []
if regions is None:
regions = self.regions
for region in regions:
url = f"{self.BASE_URL}/{region}.html"
payload = {"cmd": "request.get", "url": url, "maxTimeout": 60000}
response = requests.post(self.flaresolver_url, json=payload)
if response.status_code != 200:
continue
html_content = response.json().get("solution", {}).get("response", "")
html_regions.append(html_content)
print(f"Обработал страницу по {region}")
return html_regions
def get_satellite_urls(self, html_regions: list[str]):
sat_names = []
sat_urls = []
for region_page in html_regions:
soup = BeautifulSoup(region_page, "html.parser")
col_table = soup.find_all("div", class_="desktab")[0]
tables = col_table.find_next_sibling("table").find_all("table")
trs = []
for table in tables:
trs.extend(table.find_all("tr"))
for tr in trs:
sat_name = tr.find("span").text
if self.target_sats is not None:
if sat_name.strip().lower() not in self.target_sats:
continue
try:
sat_url = tr.find_all("a")[2]["href"]
except IndexError:
sat_url = tr.find_all("a")[0]["href"]
sat_names.append(sat_name)
sat_urls.append(sat_url)
return sat_names, sat_urls
def get_satellites_data(self) -> dict[dict]:
sat_data = {}
for region_page in self.get_region_pages(self.regions):
soup = BeautifulSoup(region_page, "html.parser")
col_table = soup.find_all("div", class_="desktab")[0]
tables = col_table.find_next_sibling("table").find_all("table")
trs = []
for table in tables:
trs.extend(table.find_all("tr"))
for tr in trs:
sat_name = tr.find("span").text.replace("ü", "u").strip().lower()
if self.target_sats is not None:
names = parse_satellite_names(sat_name)
if len(names) == 1:
sat_name = names[0]
else:
for name in names:
if name in self.target_sats:
sat_name = name
if sat_name not in self.target_sats:
continue
try:
sat_url = tr.find_all("a")[2]["href"]
except IndexError:
sat_url = tr.find_all("a")[0]["href"]
update_date = tr.find_all("td")[-1].text
sat_response = requests.post(
self.flaresolver_url,
json={
"cmd": "request.get",
"url": f"{self.BASE_URL}/{sat_url}",
"maxTimeout": 60000,
},
)
html_content = (
sat_response.json().get("solution", {}).get("response", "")
)
sat_page_data = self.get_satellite_content(html_content)
sat_data[sat_name] = {
"url": f"{self.BASE_URL}/{sat_url}",
"update_date": datetime.strptime(update_date, "%y%m%d").date(),
"sources": sat_page_data,
}
return sat_data
def get_satellite_content(self, html_content: str) -> list[dict]:
data = []
sat_soup = BeautifulSoup(html_content, "html.parser")
try:
big_table = sat_soup.find("table", class_="bigtable")
all_tables = big_table.find_all("div", class_="desktab")[:-1]
for table in all_tables:
trs = table.find_next_sibling("table").find_all("tr")
for idx, tr in enumerate(trs):
tds = tr.find_all("td")
if len(tds) < 9 or idx < 2:
continue
freq, polarization = tds[0].find("b").text.strip().split("\xa0")
polarization = self.convert_polarization(polarization)
meta = self.parse_metadata(tds[1].text)
provider_name = tds[3].text
last_update = self.extract_date(tds[-1].text)
data.append(
{
"freq": freq,
"pol": polarization,
"metadata": meta,
"provider_name": provider_name,
"last_update": last_update,
}
)
except Exception as e:
print(e)
return data if data else data[{}]
class KingOfSatParser:
def __init__(self, base_url="https://ru.kingofsat.net", max_satellites=0):
"""
Инициализация парсера
:param base_url: Базовый URL сайта
:param max_satellites: Максимальное количество спутников для парсинга (0 - все)
"""
self.base_url = base_url
self.max_satellites = max_satellites
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
)
def convert_polarization(self, polarization):
"""Преобразовать код поляризации в понятное название на русском"""
polarization_map = {
"V": "Вертикальная",
"H": "Горизонтальная",
"R": "Правая",
"L": "Левая",
}
return polarization_map.get(polarization.upper(), polarization)
def fetch_page(self, url):
"""Получить HTML страницу"""
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
print(f"Ошибка при получении страницы {url}: {e}")
return None
def parse_satellite_table(self, html_content):
"""Распарсить таблицу со спутниками"""
soup = BeautifulSoup(html_content, "html.parser")
satellites = []
table = soup.find("table")
if not table:
print("Таблица не найдена")
return satellites
rows = table.find_all("tr")[1:]
for row in rows:
cols = row.find_all("td")
if len(cols) < 13:
continue
try:
position_cell = cols[0].text.strip()
position_match = re.search(r"([\d\.]+)°([EW])", position_cell)
if position_match:
position_value = position_match.group(1)
position_direction = position_match.group(2)
position = f"{position_value}{position_direction}"
else:
position = None
# Название спутника (2-я колонка)
satellite_cell = cols[1]
satellite_name = satellite_cell.get_text(strip=True)
# Удаляем возможные лишние символы или пробелы
satellite_name = re.sub(r"\s+", " ", satellite_name).strip()
# NORAD (3-я колонка)
norad = cols[2].text.strip()
if not norad or norad == "-":
norad = None
ini_link = None
ini_cell = cols[3]
ini_img = ini_cell.find("img", src=lambda x: x and "disquette.gif" in x)
if ini_img and position:
ini_link = f"https://ru.kingofsat.net/dl.php?pos={position}&fkhz=0"
update_date = cols[12].text.strip() if len(cols) > 12 else None
if satellite_name and ini_link and position:
satellites.append(
{
"position": position,
"name": satellite_name,
"norad": norad,
"ini_url": ini_link,
"update_date": update_date,
}
)
except Exception as e:
print(f"Ошибка при обработке строки таблицы: {e}")
continue
return satellites
def parse_ini_file(self, ini_content):
"""Распарсить содержимое .ini файла"""
data = {"metadata": {}, "sattype": {}, "dvb": {}}
# # Извлекаем метаданные из комментариев
# metadata_match = re.search(r'\[ downloaded from www\.kingofsat\.net \(c\) (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \]', ini_content)
# if metadata_match:
# data['metadata']['downloaded'] = metadata_match.group(1)
# Парсим секцию [SATTYPE]
sattype_match = re.search(r"\[SATTYPE\](.*?)\n\[", ini_content, re.DOTALL)
if sattype_match:
sattype_content = sattype_match.group(1).strip()
for line in sattype_content.split("\n"):
line = line.strip()
if "=" in line:
key, value = line.split("=", 1)
data["sattype"][key.strip()] = value.strip()
# Парсим секцию [DVB]
dvb_match = re.search(r"\[DVB\](.*?)(?:\n\[|$)", ini_content, re.DOTALL)
if dvb_match:
dvb_content = dvb_match.group(1).strip()
for line in dvb_content.split("\n"):
line = line.strip()
if "=" in line:
key, value = line.split("=", 1)
params = [p.strip() for p in value.split(",")]
polarization = params[1] if len(params) > 1 else ""
if polarization:
polarization = self.convert_polarization(polarization)
data["dvb"][key.strip()] = {
"frequency": params[0] if len(params) > 0 else "",
"polarization": polarization,
"symbol_rate": params[2] if len(params) > 2 else "",
"fec": params[3] if len(params) > 3 else "",
"standard": params[4] if len(params) > 4 else "",
"modulation": params[5] if len(params) > 5 else "",
}
return data
def download_ini_file(self, url):
"""Скачать содержимое .ini файла"""
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
print(f"Ошибка при скачивании .ini файла {url}: {e}")
return None
def get_all_satellites_data(self):
"""Получить данные всех спутников с учетом ограничения max_satellites"""
html_content = self.fetch_page(self.base_url + "/satellites")
if not html_content:
return []
satellites = self.parse_satellite_table(html_content)
if self.max_satellites > 0 and len(satellites) > self.max_satellites:
satellites = satellites[: self.max_satellites]
results = []
processed_count = 0
for satellite in satellites:
print(f"Обработка спутника: {satellite['name']} ({satellite['position']})")
ini_content = self.download_ini_file(satellite["ini_url"])
if not ini_content:
print(f"Не удалось скачать .ini файл для {satellite['name']}")
continue
parsed_ini = self.parse_ini_file(ini_content)
result = {
"satellite_name": satellite["name"],
"position": satellite["position"],
"norad": satellite["norad"],
"update_date": satellite["update_date"],
"ini_url": satellite["ini_url"],
"ini_data": parsed_ini,
}
results.append(result)
processed_count += 1
if self.max_satellites > 0 and processed_count >= self.max_satellites:
break
time.sleep(1)
return results
def create_satellite_dict(self, satellites_data):
"""Создать словарь с данными спутников"""
satellite_dict = {}
for data in satellites_data:
key = f"{data['position']}_{data['satellite_name'].replace(' ', '_').replace('/', '_')}"
satellite_dict[key] = {
"name": data["satellite_name"],
"position": data["position"],
"norad": data["norad"],
"update_date": data["update_date"],
"ini_url": data["ini_url"],
"transponders_count": len(data["ini_data"]["dvb"]),
"transponders": data["ini_data"]["dvb"],
"sattype_info": data["ini_data"]["sattype"],
"metadata": data["ini_data"]["metadata"],
}
return satellite_dict