438 lines
18 KiB
Python
438 lines
18 KiB
Python
import requests
|
||
from bs4 import BeautifulSoup
|
||
from datetime import datetime
|
||
import re
|
||
import time
|
||
|
||
def parse_satellite_names(satellite_string: str) -> list[str]:
|
||
slash_parts = [part.strip() for part in satellite_string.split('/')]
|
||
all_names = []
|
||
for part in slash_parts:
|
||
main_match = re.match(r'^([^(]+)', part)
|
||
if main_match:
|
||
main_name = main_match.group(1).strip()
|
||
if main_name:
|
||
all_names.append(main_name)
|
||
bracket_match = re.search(r'\(([^)]+)\)', part)
|
||
if bracket_match:
|
||
bracket_name = bracket_match.group(1).strip()
|
||
if bracket_name:
|
||
all_names.append(bracket_name)
|
||
seen = set()
|
||
result = []
|
||
for name in all_names:
|
||
if name not in seen:
|
||
seen.add(name)
|
||
result.append(name.strip().lower())
|
||
return result
|
||
|
||
|
||
class LyngSatParser:
|
||
"""Парсер данных для LyngSat(Для работы нужен flaresolver)"""
|
||
|
||
def __init__(
|
||
self,
|
||
flaresolver_url: str = "http://localhost:8191/v1",
|
||
regions: list[str] | None = None,
|
||
target_sats: list[str] | None = None,
|
||
):
|
||
self.flaresolver_url = flaresolver_url
|
||
self.regions = regions
|
||
self.target_sats = (
|
||
list(map(lambda sat: sat.strip().lower(), target_sats)) if regions else None
|
||
)
|
||
self.regions = regions if regions else ["europe", "asia", "america", "atlantic"]
|
||
self.BASE_URL = "https://www.lyngsat.com"
|
||
|
||
def parse_metadata(self, metadata: str) -> dict:
|
||
if not metadata or not metadata.strip():
|
||
return {
|
||
"standard": None,
|
||
"modulation": None,
|
||
"symbol_rate": None,
|
||
"fec": None,
|
||
}
|
||
normalized = re.sub(r"\s+", "", metadata.strip())
|
||
fec_match = re.search(r"([1-9]/[1-9])$", normalized)
|
||
fec = fec_match.group(1) if fec_match else None
|
||
if fec_match:
|
||
core = normalized[: fec_match.start()]
|
||
else:
|
||
core = normalized
|
||
std_match = re.match(r"(DVB-S2?|ABS-S|DVB-T2?|ATSC|ISDB)", core)
|
||
standard = std_match.group(1) if std_match else None
|
||
rest = core[len(standard) :] if standard else core
|
||
modulation = None
|
||
mod_match = re.match(r"(8PSK|QPSK|16APSK|32APSK|64QAM|256QAM|BPSK)", rest)
|
||
if mod_match:
|
||
modulation = mod_match.group(1)
|
||
rest = rest[len(modulation) :]
|
||
symbol_rate = None
|
||
sr_match = re.search(r"(\d+)$", rest)
|
||
if sr_match:
|
||
try:
|
||
symbol_rate = int(sr_match.group(1))
|
||
except ValueError:
|
||
pass
|
||
|
||
return {
|
||
"standard": standard,
|
||
"modulation": modulation,
|
||
"symbol_rate": symbol_rate,
|
||
"fec": fec,
|
||
}
|
||
|
||
def extract_date(self, s: str) -> datetime | None:
|
||
s = s.strip()
|
||
match = re.search(r"(\d{6})$", s)
|
||
if not match:
|
||
return None
|
||
yymmdd = match.group(1)
|
||
try:
|
||
return datetime.strptime(yymmdd, "%y%m%d").date()
|
||
except ValueError:
|
||
return None
|
||
|
||
def convert_polarization(self, polarization: str) -> str:
|
||
"""Преобразовать код поляризации в понятное название на русском"""
|
||
polarization_map = {
|
||
"V": "Вертикальная",
|
||
"H": "Горизонтальная",
|
||
"R": "Правая",
|
||
"L": "Левая",
|
||
}
|
||
return polarization_map.get(polarization.upper(), polarization)
|
||
|
||
def get_region_pages(self, regions: list[str] | None = None) -> list[str]:
|
||
html_regions = []
|
||
if regions is None:
|
||
regions = self.regions
|
||
for region in regions:
|
||
url = f"{self.BASE_URL}/{region}.html"
|
||
payload = {"cmd": "request.get", "url": url, "maxTimeout": 60000}
|
||
response = requests.post(self.flaresolver_url, json=payload)
|
||
if response.status_code != 200:
|
||
continue
|
||
html_content = response.json().get("solution", {}).get("response", "")
|
||
html_regions.append(html_content)
|
||
print(f"Обработал страницу по {region}")
|
||
return html_regions
|
||
|
||
def get_satellite_urls(self, html_regions: list[str]):
|
||
sat_names = []
|
||
sat_urls = []
|
||
for region_page in html_regions:
|
||
soup = BeautifulSoup(region_page, "html.parser")
|
||
|
||
col_table = soup.find_all("div", class_="desktab")[0]
|
||
|
||
tables = col_table.find_next_sibling("table").find_all("table")
|
||
trs = []
|
||
for table in tables:
|
||
trs.extend(table.find_all("tr"))
|
||
for tr in trs:
|
||
sat_name = tr.find("span").text
|
||
if self.target_sats is not None:
|
||
if sat_name.strip().lower() not in self.target_sats:
|
||
continue
|
||
try:
|
||
sat_url = tr.find_all("a")[2]["href"]
|
||
except IndexError:
|
||
sat_url = tr.find_all("a")[0]["href"]
|
||
sat_names.append(sat_name)
|
||
sat_urls.append(sat_url)
|
||
return sat_names, sat_urls
|
||
|
||
def get_satellites_data(self) -> dict[dict]:
|
||
sat_data = {}
|
||
for region_page in self.get_region_pages(self.regions):
|
||
soup = BeautifulSoup(region_page, "html.parser")
|
||
|
||
col_table = soup.find_all("div", class_="desktab")[0]
|
||
|
||
tables = col_table.find_next_sibling("table").find_all("table")
|
||
trs = []
|
||
for table in tables:
|
||
trs.extend(table.find_all("tr"))
|
||
for tr in trs:
|
||
sat_name = tr.find("span").text.replace("ü", "u").strip().lower()
|
||
if self.target_sats is not None:
|
||
names = parse_satellite_names(sat_name)
|
||
if len(names) == 1:
|
||
sat_name = names[0]
|
||
else:
|
||
for name in names:
|
||
if name in self.target_sats:
|
||
sat_name = name
|
||
if sat_name not in self.target_sats:
|
||
continue
|
||
try:
|
||
sat_url = tr.find_all("a")[2]["href"]
|
||
except IndexError:
|
||
sat_url = tr.find_all("a")[0]["href"]
|
||
|
||
update_date = tr.find_all("td")[-1].text
|
||
sat_response = requests.post(
|
||
self.flaresolver_url,
|
||
json={
|
||
"cmd": "request.get",
|
||
"url": f"{self.BASE_URL}/{sat_url}",
|
||
"maxTimeout": 60000,
|
||
},
|
||
)
|
||
html_content = (
|
||
sat_response.json().get("solution", {}).get("response", "")
|
||
)
|
||
sat_page_data = self.get_satellite_content(html_content)
|
||
sat_data[sat_name] = {
|
||
"url": f"{self.BASE_URL}/{sat_url}",
|
||
"update_date": datetime.strptime(update_date, "%y%m%d").date(),
|
||
"sources": sat_page_data,
|
||
}
|
||
return sat_data
|
||
|
||
def get_satellite_content(self, html_content: str) -> list[dict]:
|
||
data = []
|
||
sat_soup = BeautifulSoup(html_content, "html.parser")
|
||
try:
|
||
big_table = sat_soup.find("table", class_="bigtable")
|
||
all_tables = big_table.find_all("div", class_="desktab")[:-1]
|
||
for table in all_tables:
|
||
trs = table.find_next_sibling("table").find_all("tr")
|
||
for idx, tr in enumerate(trs):
|
||
tds = tr.find_all("td")
|
||
if len(tds) < 9 or idx < 2:
|
||
continue
|
||
freq, polarization = tds[0].find("b").text.strip().split("\xa0")
|
||
polarization = self.convert_polarization(polarization)
|
||
meta = self.parse_metadata(tds[1].text)
|
||
provider_name = tds[3].text
|
||
last_update = self.extract_date(tds[-1].text)
|
||
data.append(
|
||
{
|
||
"freq": freq,
|
||
"pol": polarization,
|
||
"metadata": meta,
|
||
"provider_name": provider_name,
|
||
"last_update": last_update,
|
||
}
|
||
)
|
||
except Exception as e:
|
||
print(e)
|
||
return data if data else data[{}]
|
||
|
||
|
||
class KingOfSatParser:
|
||
def __init__(self, base_url="https://ru.kingofsat.net", max_satellites=0):
|
||
"""
|
||
Инициализация парсера
|
||
:param base_url: Базовый URL сайта
|
||
:param max_satellites: Максимальное количество спутников для парсинга (0 - все)
|
||
"""
|
||
self.base_url = base_url
|
||
self.max_satellites = max_satellites
|
||
self.session = requests.Session()
|
||
self.session.headers.update(
|
||
{
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
||
}
|
||
)
|
||
|
||
def convert_polarization(self, polarization):
|
||
"""Преобразовать код поляризации в понятное название на русском"""
|
||
polarization_map = {
|
||
"V": "Вертикальная",
|
||
"H": "Горизонтальная",
|
||
"R": "Правая",
|
||
"L": "Левая",
|
||
}
|
||
return polarization_map.get(polarization.upper(), polarization)
|
||
|
||
def fetch_page(self, url):
|
||
"""Получить HTML страницу"""
|
||
try:
|
||
response = self.session.get(url, timeout=30)
|
||
response.raise_for_status()
|
||
return response.text
|
||
except Exception as e:
|
||
print(f"Ошибка при получении страницы {url}: {e}")
|
||
return None
|
||
|
||
def parse_satellite_table(self, html_content):
|
||
"""Распарсить таблицу со спутниками"""
|
||
soup = BeautifulSoup(html_content, "html.parser")
|
||
satellites = []
|
||
table = soup.find("table")
|
||
if not table:
|
||
print("Таблица не найдена")
|
||
return satellites
|
||
|
||
rows = table.find_all("tr")[1:]
|
||
|
||
for row in rows:
|
||
cols = row.find_all("td")
|
||
if len(cols) < 13:
|
||
continue
|
||
|
||
try:
|
||
position_cell = cols[0].text.strip()
|
||
position_match = re.search(r"([\d\.]+)°([EW])", position_cell)
|
||
if position_match:
|
||
position_value = position_match.group(1)
|
||
position_direction = position_match.group(2)
|
||
position = f"{position_value}{position_direction}"
|
||
else:
|
||
position = None
|
||
|
||
# Название спутника (2-я колонка)
|
||
satellite_cell = cols[1]
|
||
satellite_name = satellite_cell.get_text(strip=True)
|
||
# Удаляем возможные лишние символы или пробелы
|
||
satellite_name = re.sub(r"\s+", " ", satellite_name).strip()
|
||
|
||
# NORAD (3-я колонка)
|
||
norad = cols[2].text.strip()
|
||
if not norad or norad == "-":
|
||
norad = None
|
||
|
||
ini_link = None
|
||
ini_cell = cols[3]
|
||
ini_img = ini_cell.find("img", src=lambda x: x and "disquette.gif" in x)
|
||
if ini_img and position:
|
||
ini_link = f"https://ru.kingofsat.net/dl.php?pos={position}&fkhz=0"
|
||
|
||
update_date = cols[12].text.strip() if len(cols) > 12 else None
|
||
|
||
if satellite_name and ini_link and position:
|
||
satellites.append(
|
||
{
|
||
"position": position,
|
||
"name": satellite_name,
|
||
"norad": norad,
|
||
"ini_url": ini_link,
|
||
"update_date": update_date,
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"Ошибка при обработке строки таблицы: {e}")
|
||
continue
|
||
|
||
return satellites
|
||
|
||
def parse_ini_file(self, ini_content):
|
||
"""Распарсить содержимое .ini файла"""
|
||
data = {"metadata": {}, "sattype": {}, "dvb": {}}
|
||
|
||
# # Извлекаем метаданные из комментариев
|
||
# metadata_match = re.search(r'\[ downloaded from www\.kingofsat\.net \(c\) (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \]', ini_content)
|
||
# if metadata_match:
|
||
# data['metadata']['downloaded'] = metadata_match.group(1)
|
||
|
||
# Парсим секцию [SATTYPE]
|
||
sattype_match = re.search(r"\[SATTYPE\](.*?)\n\[", ini_content, re.DOTALL)
|
||
if sattype_match:
|
||
sattype_content = sattype_match.group(1).strip()
|
||
for line in sattype_content.split("\n"):
|
||
line = line.strip()
|
||
if "=" in line:
|
||
key, value = line.split("=", 1)
|
||
data["sattype"][key.strip()] = value.strip()
|
||
|
||
# Парсим секцию [DVB]
|
||
dvb_match = re.search(r"\[DVB\](.*?)(?:\n\[|$)", ini_content, re.DOTALL)
|
||
if dvb_match:
|
||
dvb_content = dvb_match.group(1).strip()
|
||
for line in dvb_content.split("\n"):
|
||
line = line.strip()
|
||
if "=" in line:
|
||
key, value = line.split("=", 1)
|
||
params = [p.strip() for p in value.split(",")]
|
||
polarization = params[1] if len(params) > 1 else ""
|
||
if polarization:
|
||
polarization = self.convert_polarization(polarization)
|
||
|
||
data["dvb"][key.strip()] = {
|
||
"frequency": params[0] if len(params) > 0 else "",
|
||
"polarization": polarization,
|
||
"symbol_rate": params[2] if len(params) > 2 else "",
|
||
"fec": params[3] if len(params) > 3 else "",
|
||
"standard": params[4] if len(params) > 4 else "",
|
||
"modulation": params[5] if len(params) > 5 else "",
|
||
}
|
||
|
||
return data
|
||
|
||
def download_ini_file(self, url):
|
||
"""Скачать содержимое .ini файла"""
|
||
try:
|
||
response = self.session.get(url, timeout=30)
|
||
response.raise_for_status()
|
||
return response.text
|
||
except Exception as e:
|
||
print(f"Ошибка при скачивании .ini файла {url}: {e}")
|
||
return None
|
||
|
||
def get_all_satellites_data(self):
|
||
"""Получить данные всех спутников с учетом ограничения max_satellites"""
|
||
html_content = self.fetch_page(self.base_url + "/satellites")
|
||
if not html_content:
|
||
return []
|
||
|
||
satellites = self.parse_satellite_table(html_content)
|
||
|
||
if self.max_satellites > 0 and len(satellites) > self.max_satellites:
|
||
satellites = satellites[: self.max_satellites]
|
||
|
||
results = []
|
||
processed_count = 0
|
||
|
||
for satellite in satellites:
|
||
print(f"Обработка спутника: {satellite['name']} ({satellite['position']})")
|
||
|
||
ini_content = self.download_ini_file(satellite["ini_url"])
|
||
if not ini_content:
|
||
print(f"Не удалось скачать .ini файл для {satellite['name']}")
|
||
continue
|
||
|
||
parsed_ini = self.parse_ini_file(ini_content)
|
||
|
||
result = {
|
||
"satellite_name": satellite["name"],
|
||
"position": satellite["position"],
|
||
"norad": satellite["norad"],
|
||
"update_date": satellite["update_date"],
|
||
"ini_url": satellite["ini_url"],
|
||
"ini_data": parsed_ini,
|
||
}
|
||
|
||
results.append(result)
|
||
processed_count += 1
|
||
|
||
if self.max_satellites > 0 and processed_count >= self.max_satellites:
|
||
break
|
||
|
||
time.sleep(1)
|
||
|
||
return results
|
||
|
||
def create_satellite_dict(self, satellites_data):
|
||
"""Создать словарь с данными спутников"""
|
||
satellite_dict = {}
|
||
|
||
for data in satellites_data:
|
||
key = f"{data['position']}_{data['satellite_name'].replace(' ', '_').replace('/', '_')}"
|
||
satellite_dict[key] = {
|
||
"name": data["satellite_name"],
|
||
"position": data["position"],
|
||
"norad": data["norad"],
|
||
"update_date": data["update_date"],
|
||
"ini_url": data["ini_url"],
|
||
"transponders_count": len(data["ini_data"]["dvb"]),
|
||
"transponders": data["ini_data"]["dvb"],
|
||
"sattype_info": data["ini_data"]["sattype"],
|
||
"metadata": data["ini_data"]["metadata"],
|
||
}
|
||
|
||
return satellite_dict
|