import requests from bs4 import BeautifulSoup from datetime import datetime import re import time class LyngSatParser: """Парсер данных для LyngSat(Для работы нужен flaresolver)""" def __init__( self, flaresolver_url: str = "http://localhost:8191/v1", regions: list[str] | None = None, target_sats: list[str] | None = None, ): self.flaresolver_url = flaresolver_url self.regions = regions self.target_sats = list(map(lambda sat: sat.strip().lower(), target_sats)) if regions else None self.regions = regions if regions else ["europe", "asia", "america", "atlantic"] self.BASE_URL = "https://www.lyngsat.com" def parse_metadata(self, metadata: str) -> dict: if not metadata or not metadata.strip(): return { 'standard': None, 'modulation': None, 'symbol_rate': None, 'fec': None } normalized = re.sub(r'\s+', '', metadata.strip()) fec_match = re.search(r'([1-9]/[1-9])$', normalized) fec = fec_match.group(1) if fec_match else None if fec_match: core = normalized[:fec_match.start()] else: core = normalized std_match = re.match(r'(DVB-S2?|ABS-S|DVB-T2?|ATSC|ISDB)', core) standard = std_match.group(1) if std_match else None rest = core[len(standard):] if standard else core modulation = None mod_match = re.match(r'(8PSK|QPSK|16APSK|32APSK|64QAM|256QAM|BPSK)', rest) if mod_match: modulation = mod_match.group(1) rest = rest[len(modulation):] symbol_rate = None sr_match = re.search(r'(\d+)$', rest) if sr_match: try: symbol_rate = int(sr_match.group(1)) except ValueError: pass return { 'standard': standard, 'modulation': modulation, 'symbol_rate': symbol_rate, 'fec': fec } def extract_date(self, s: str) -> datetime | None: s = s.strip() match = re.search(r'(\d{6})$', s) if not match: return None yymmdd = match.group(1) try: return datetime.strptime(yymmdd, '%y%m%d').date() except ValueError: return None def convert_polarization(self, polarization: str) -> str: """Преобразовать код поляризации в понятное название на русском""" polarization_map = { 'V': 'Вертикальная', 'H': 'Горизонтальная', 'R': 'Правая', 'L': 'Левая' } return polarization_map.get(polarization.upper(), polarization) def get_region_pages(self, regions: list[str] | None = None) -> list[str]: html_regions = [] if regions is None: regions = self.regions for region in regions: url = f"{self.BASE_URL}/{region}.html" payload = { "cmd": "request.get", "url": url, "maxTimeout": 60000 } response = requests.post(self.flaresolver_url, json=payload) if response.status_code != 200: continue html_content = response.json().get("solution", {}).get("response", "") html_regions.append(html_content) print(f"Обработал страницу по {region}") return html_regions def get_satellite_urls(self, html_regions: list[str]): sat_names = [] sat_urls = [] for region_page in html_regions: soup = BeautifulSoup(region_page, "html.parser") col_table = soup.find_all("div", class_="desktab")[0] tables = col_table.find_next_sibling('table').find_all('table') trs = [] for table in tables: trs.extend(table.find_all('tr')) for tr in trs: sat_name = tr.find('span').text if self.target_sats is not None: if sat_name.strip().lower() not in self.target_sats: continue try: sat_url = tr.find_all('a')[2]['href'] except IndexError: sat_url = tr.find_all('a')[0]['href'] sat_names.append(sat_name) sat_urls.append(sat_url) return sat_names, sat_urls def get_satellites_data(self) -> dict[dict]: sat_data = {} for region_page in self.get_region_pages(self.regions): soup = BeautifulSoup(region_page, "html.parser") col_table = soup.find_all("div", class_="desktab")[0] tables = col_table.find_next_sibling('table').find_all('table') trs = [] for table in tables: trs.extend(table.find_all('tr')) for tr in trs: sat_name = tr.find('span').text if self.target_sats is not None: if sat_name.strip().lower() not in self.target_sats: continue try: sat_url = tr.find_all('a')[2]['href'] except IndexError: sat_url = tr.find_all('a')[0]['href'] update_date = tr.find_all('td')[-1].text sat_response = requests.post(self.flaresolver_url, json={ "cmd": "request.get", "url": f"{self.BASE_URL}/{sat_url}", "maxTimeout": 60000 }) html_content = sat_response.json().get("solution", {}).get("response", "") sat_page_data = self.get_satellite_content(html_content) sat_data[sat_name] = { "url": f"{self.BASE_URL}/{sat_url}", "update_date": datetime.strptime(update_date, "%y%m%d").date(), "sources": sat_page_data } return sat_data def get_satellite_content(self, html_content: str) -> dict: sat_soup = BeautifulSoup(html_content, "html.parser") big_table = sat_soup.find('table', class_='bigtable') all_tables = big_table.find_all("div", class_="desktab")[:-1] data = [] for table in all_tables: trs = table.find_next_sibling('table').find_all('tr') for idx, tr in enumerate(trs): tds = tr.find_all('td') if len(tds) < 9 or idx < 2: continue freq, polarization = tds[0].find('b').text.strip().split('\xa0') polarization = self.convert_polarization(polarization) meta = self.parse_metadata(tds[1].text) provider_name = tds[3].text last_update = self.extract_date(tds[-1].text) data.append({ "freq": freq, "pol": polarization, "metadata": meta, "provider_name": provider_name, "last_update": last_update }) return data class KingOfSatParser: def __init__(self, base_url="https://ru.kingofsat.net", max_satellites=0): """ Инициализация парсера :param base_url: Базовый URL сайта :param max_satellites: Максимальное количество спутников для парсинга (0 - все) """ self.base_url = base_url self.max_satellites = max_satellites self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) def convert_polarization(self, polarization): """Преобразовать код поляризации в понятное название на русском""" polarization_map = { 'V': 'Вертикальная', 'H': 'Горизонтальная', 'R': 'Правая', 'L': 'Левая' } return polarization_map.get(polarization.upper(), polarization) def fetch_page(self, url): """Получить HTML страницу""" try: response = self.session.get(url, timeout=30) response.raise_for_status() return response.text except Exception as e: print(f"Ошибка при получении страницы {url}: {e}") return None def parse_satellite_table(self, html_content): """Распарсить таблицу со спутниками""" soup = BeautifulSoup(html_content, 'html.parser') satellites = [] table = soup.find('table') if not table: print("Таблица не найдена") return satellites rows = table.find_all('tr')[1:] for row in rows: cols = row.find_all('td') if len(cols) < 13: continue try: position_cell = cols[0].text.strip() position_match = re.search(r'([\d\.]+)°([EW])', position_cell) if position_match: position_value = position_match.group(1) position_direction = position_match.group(2) position = f"{position_value}{position_direction}" else: position = None # Название спутника (2-я колонка) satellite_cell = cols[1] satellite_name = satellite_cell.get_text(strip=True) # Удаляем возможные лишние символы или пробелы satellite_name = re.sub(r'\s+', ' ', satellite_name).strip() # NORAD (3-я колонка) norad = cols[2].text.strip() if not norad or norad == "-": norad = None ini_link = None ini_cell = cols[3] ini_img = ini_cell.find('img', src=lambda x: x and 'disquette.gif' in x) if ini_img and position: ini_link = f"https://ru.kingofsat.net/dl.php?pos={position}&fkhz=0" update_date = cols[12].text.strip() if len(cols) > 12 else None if satellite_name and ini_link and position: satellites.append({ 'position': position, 'name': satellite_name, 'norad': norad, 'ini_url': ini_link, 'update_date': update_date }) except Exception as e: print(f"Ошибка при обработке строки таблицы: {e}") continue return satellites def parse_ini_file(self, ini_content): """Распарсить содержимое .ini файла""" data = { 'metadata': {}, 'sattype': {}, 'dvb': {} } # # Извлекаем метаданные из комментариев # metadata_match = re.search(r'\[ downloaded from www\.kingofsat\.net \(c\) (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \]', ini_content) # if metadata_match: # data['metadata']['downloaded'] = metadata_match.group(1) # Парсим секцию [SATTYPE] sattype_match = re.search(r'\[SATTYPE\](.*?)\n\[', ini_content, re.DOTALL) if sattype_match: sattype_content = sattype_match.group(1).strip() for line in sattype_content.split('\n'): line = line.strip() if '=' in line: key, value = line.split('=', 1) data['sattype'][key.strip()] = value.strip() # Парсим секцию [DVB] dvb_match = re.search(r'\[DVB\](.*?)(?:\n\[|$)', ini_content, re.DOTALL) if dvb_match: dvb_content = dvb_match.group(1).strip() for line in dvb_content.split('\n'): line = line.strip() if '=' in line: key, value = line.split('=', 1) params = [p.strip() for p in value.split(',')] polarization = params[1] if len(params) > 1 else '' if polarization: polarization = self.convert_polarization(polarization) data['dvb'][key.strip()] = { 'frequency': params[0] if len(params) > 0 else '', 'polarization': polarization, 'symbol_rate': params[2] if len(params) > 2 else '', 'fec': params[3] if len(params) > 3 else '', 'standard': params[4] if len(params) > 4 else '', 'modulation': params[5] if len(params) > 5 else '' } return data def download_ini_file(self, url): """Скачать содержимое .ini файла""" try: response = self.session.get(url, timeout=30) response.raise_for_status() return response.text except Exception as e: print(f"Ошибка при скачивании .ini файла {url}: {e}") return None def get_all_satellites_data(self): """Получить данные всех спутников с учетом ограничения max_satellites""" html_content = self.fetch_page(self.base_url + '/satellites') if not html_content: return [] satellites = self.parse_satellite_table(html_content) if self.max_satellites > 0 and len(satellites) > self.max_satellites: satellites = satellites[:self.max_satellites] results = [] processed_count = 0 for satellite in satellites: print(f"Обработка спутника: {satellite['name']} ({satellite['position']})") ini_content = self.download_ini_file(satellite['ini_url']) if not ini_content: print(f"Не удалось скачать .ini файл для {satellite['name']}") continue parsed_ini = self.parse_ini_file(ini_content) result = { 'satellite_name': satellite['name'], 'position': satellite['position'], 'norad': satellite['norad'], 'update_date': satellite['update_date'], 'ini_url': satellite['ini_url'], 'ini_data': parsed_ini } results.append(result) processed_count += 1 if self.max_satellites > 0 and processed_count >= self.max_satellites: break time.sleep(1) return results def create_satellite_dict(self, satellites_data): """Создать словарь с данными спутников""" satellite_dict = {} for data in satellites_data: key = f"{data['position']}_{data['satellite_name'].replace(' ', '_').replace('/', '_')}" satellite_dict[key] = { 'name': data['satellite_name'], 'position': data['position'], 'norad': data['norad'], 'update_date': data['update_date'], 'ini_url': data['ini_url'], 'transponders_count': len(data['ini_data']['dvb']), 'transponders': data['ini_data']['dvb'], 'sattype_info': data['ini_data']['sattype'], 'metadata': data['ini_data']['metadata'] } return satellite_dict from pprint import pprint lyngsat = LyngSatParser(regions=['europe'], target_sats=['Türksat 3A', 'Intelsat 22']) html_regions = lyngsat.get_region_pages() pprint(lyngsat.get_satellite_urls(html_regions))