Вся информация представлена исключительно в демонстрационных целях
Общие сведения: Это программа для сбора информации из социальной сети ВКонтакте (VK). Она получает данные о группах, их публикациях, комментариях и сохраняет всё в базу данных на компьютере.
Структура программы:
vk_parse.py (главный файл) ├── config.py (файл с настройками - токен доступа) ├── vk_data.db (база данных, создается автоматически) └── export/ (папка для выгрузки данных в CSV)
Ключевые понятия:
- Токен доступа — это уникальный идентификатор, который используется для аутентификации пользователя или приложения при доступе к защищенным ресурсам API (как получить токен описано здесь и здесь).
- API VK — это интерфейс, который позволяет получать информацию из базы данных vk.com с помощью HTTP-запросов к специальному серверу. Синтаксис запросов и тип возвращаемых ими данных строго определены на стороне самого сервиса.
- SQLite база данных — встраиваемая реляционная система управления базами данных (СУБД). В отличие от классических СУБД вроде MySQL или PostgreSQL, которые функционируют как отдельные серверные приложения, SQLite интегрируется непосредственно в код программы.
- Домен группы — короткое название страницы в VK (например,
club12345илиomsk).
Пошаговый разбор кода
Шаг 1. Подготовка (импорт инструментов)
import csv import time import requests import json import datetime import sqlite3 from typing import Dict, List, Any, Optional, Tuple from urllib.parse import urlparse import os import re import config
Что это делает:
csv— для сохранения данных в таблицы Excelrequests— для отправки запросов к серверам VKsqlite3— для работы с базой данныхdatetime— для работы с датами и временемconfig— импорт настроек (токен доступа)
Шаг 2. Создание главного класса VKParser
class VKParser:
def __init__(self, token: str, version: float = 5.199):
Шаг 3. Создание структуры базы данных
Таблица 1: groups — информация о группах
CREATE TABLE groups (
group_id INTEGER PRIMARY KEY, -- Уникальный номер группы
screen_name TEXT UNIQUE, -- Короткое имя (например, "club123")
name TEXT, -- Название группы
description TEXT, -- Описание группы
members_count INTEGER, -- Количество участников
parsed_at TIMESTAMP -- Когда собрана информация
)
Таблица 2: posts — публикации в группах
CREATE TABLE posts (
post_id INTEGER, -- Номер публикации
group_id INTEGER, -- К какой группе относится
owner_id INTEGER, -- Кто автор
date TIMESTAMP, -- Дата публикации
text TEXT, -- Текст поста
likes_count INTEGER, -- Количество лайков
reposts_count INTEGER, -- Количество репостов
views_count INTEGER, -- Количество просмотров
comments_count INTEGER, -- Количество комментариев
post_url TEXT, -- Ссылка на пост
has_attachments BOOLEAN -- Есть ли вложения (фото, видео)
)
Таблица 3: attachments — вложения к постам
CREATE TABLE attachments (
attachment_id INTEGER, -- Уникальный номер вложения
post_id INTEGER, -- К какому посту относится
attachment_type TEXT, -- Тип (фото, видео, документ)
url TEXT, -- Ссылка на файл
title TEXT, -- Заголовок
description TEXT -- Описание
)
Таблица 4: comments — комментарии к постам
CREATE TABLE comments (
comment_id INTEGER, -- Номер комментария
post_id INTEGER, -- К какому посту
text TEXT, -- Текст комментария
likes_count INTEGER, -- Лайки у комментария
thread_count INTEGER -- Количество ответов на комментарий
)
Таблица 5: comment_threads — ответы на комментарии
CREATE TABLE comment_threads (
thread_id INTEGER, -- Номер ответа
comment_id INTEGER, -- К какому комментарию
text TEXT, -- Текст ответа
from_id INTEGER -- Кто написал ответ
)
Шаг 4. Получение данных из VK
# 1. Формируем запрос
params = {
'access_token': 'ваш_токен', # Ключ доступа
'v': 5.199, # Версия API
'group_id': 'club12345', # Имя группы
'fields': 'description,members_count' # Какие данные нужны
}
# 2. Отправляем запрос к серверу VK
response = requests.get(
'https://api.vk.com/method/groups.getById',
params=params
)
# 3. Получаем ответ в формате JSON
data = response.json()
Пример ответа:
{
"response": [{
"id": 12345,
"name": "Название группы",
"screen_name": "club12345",
"description": "Описание...",
"members_count": 1000
}]
}
Шаг 5. Парсим посты
Алгоритм работы:
- Получить список постов → запрос к
wall.get - Проверить каждый пост → уже есть в базе?
- Если нет → сохранить текст, статистику, ссылку
- Проверить вложения → фото, видео, документы
- Если нужно → получить комментарии к посту
- Сохранить всё → в базу данных
Важная особенность: программа проверяет дубликаты, чтобы не сохранять одно и то же дважды.
Шаг 6. Обработка комментариев
Пост ├── Комментарий 1 │ ├── Ответ 1 на комментарий 1 │ └── Ответ 2 на комментарий 1 ├── Комментарий 2 │ └── Ответ 1 на комментарий 2 └── Комментарий 3
Программа сохраняет:
- Основные комментарии → таблица
comments - Ответы на комментарии → таблица
comment_threads
Полный код
# vk_parse.py
"""
Модуль для парсинга публичных страниц и групп VK с сохранением в базу данных SQLite
"""
import csv
import time
import requests
import json
import datetime
import sqlite3
from typing import Dict, List, Any, Optional, Tuple
from urllib.parse import urlparse
import os
import re
# импорт файла конфигурации
import config
class VKParser:
"""Класс для парсинга данных из VK"""
def __init__(self, token: str, version: float = 5.199):
"""
Инициализация парсера
Args:
token: Токен доступа VK API
version: Версия API VK
"""
self.token = token
self.version = version
self.session = requests.Session()
self.db = None
def connect_database(self, db_path: str = "vk_data.db"):
"""
Подключение к базе данных SQLite
Args:
db_path: Путь к файлу базы данных
"""
try:
self.db = sqlite3.connect(db_path, check_same_thread=False)
self.db.row_factory = sqlite3.Row
self._create_tables()
print(f"База данных подключена: {db_path}")
except sqlite3.Error as e:
print(f"Ошибка подключения к базе данных: {e}")
raise
def _create_tables(self):
"""Создание таблиц в базе данных"""
cursor = self.db.cursor()
# Таблица для групп/пабликов
cursor.execute('''
CREATE TABLE IF NOT EXISTS groups (
group_id INTEGER PRIMARY KEY,
screen_name TEXT UNIQUE,
name TEXT,
description TEXT,
members_count INTEGER,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Таблица для постов
cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
post_id INTEGER,
group_id INTEGER,
owner_id INTEGER,
from_id INTEGER,
date TIMESTAMP,
text TEXT,
likes_count INTEGER DEFAULT 0,
reposts_count INTEGER DEFAULT 0,
views_count INTEGER DEFAULT 0,
comments_count INTEGER DEFAULT 0,
post_url TEXT,
has_attachments BOOLEAN DEFAULT 0,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (post_id, owner_id),
FOREIGN KEY (group_id) REFERENCES groups(group_id)
)
''')
# Таблица для вложений
cursor.execute('''
CREATE TABLE IF NOT EXISTS attachments (
attachment_id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER,
owner_id INTEGER,
attachment_type TEXT,
url TEXT,
title TEXT,
description TEXT,
duration INTEGER,
size INTEGER,
FOREIGN KEY (post_id, owner_id) REFERENCES posts(post_id, owner_id),
UNIQUE(post_id, owner_id, attachment_type, url)
)
''')
# Таблица для комментариев
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
comment_id INTEGER,
post_id INTEGER,
owner_id INTEGER,
from_id INTEGER,
date TIMESTAMP,
text TEXT,
likes_count INTEGER DEFAULT 0,
thread_count INTEGER DEFAULT 0,
reply_to_comment INTEGER,
reply_to_user INTEGER,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (comment_id, post_id, owner_id),
FOREIGN KEY (post_id, owner_id) REFERENCES posts(post_id, owner_id)
)
''')
# Таблица для тредов (ответы на комментарии)
cursor.execute('''
CREATE TABLE IF NOT EXISTS comment_threads (
thread_id INTEGER,
comment_id INTEGER,
post_id INTEGER,
owner_id INTEGER,
from_id INTEGER,
date TIMESTAMP,
text TEXT,
likes_count INTEGER DEFAULT 0,
parsed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (thread_id, comment_id, post_id, owner_id),
FOREIGN KEY (comment_id, post_id, owner_id)
REFERENCES comments(comment_id, post_id, owner_id)
)
''')
self.db.commit()
def is_post_exists(self, post_id: int, owner_id: int) -> bool:
"""
Проверка существования поста в базе данных
Args:
post_id: ID поста
owner_id: ID владельца поста
Returns:
True если пост существует
"""
cursor = self.db.cursor()
cursor.execute(
"SELECT 1 FROM posts WHERE post_id = ? AND owner_id = ?",
(post_id, owner_id)
)
return cursor.fetchone() is not None
def is_comment_exists(self, comment_id: int, post_id: int, owner_id: int) -> bool:
"""
Проверка существования комментария в базе данных
Args:
comment_id: ID комментария
post_id: ID поста
owner_id: ID владельца поста
Returns:
True если комментарий существует
"""
cursor = self.db.cursor()
cursor.execute(
"SELECT 1 FROM comments WHERE comment_id = ? AND post_id = ? AND owner_id = ?",
(comment_id, post_id, owner_id)
)
return cursor.fetchone() is not None
def is_thread_exists(self, thread_id: int, comment_id: int, post_id: int, owner_id: int) -> bool:
"""
Проверка существования треда в базе данных
Args:
thread_id: ID треда
comment_id: ID комментария
post_id: ID поста
owner_id: ID владельца поста
Returns:
True если тред существует
"""
cursor = self.db.cursor()
cursor.execute(
"SELECT 1 FROM comment_threads WHERE thread_id = ? AND comment_id = ? AND post_id = ? AND owner_id = ?",
(thread_id, comment_id, post_id, owner_id)
)
return cursor.fetchone() is not None
def get_group_info(self, domain: str) -> Optional[Dict]:
"""
Получение информации о группе/паблике
Args:
domain: Короткое имя или ID группы
Returns:
Словарь с информацией о группе или None в случае ошибки
"""
try:
# Пытаемся получить информацию по короткому имени
params = {
'access_token': self.token,
'v': self.version,
'group_id': domain,
'fields': 'description,members_count'
}
response = self.session.get(
'https://api.vk.com/method/groups.getById',
params=params
)
if response.status_code == 200:
data = response.json()
if 'response' in data and len(data['response']) > 0:
group = data['response'][0]
return {
'group_id': abs(group['id']), # VK возвращает отрицательные ID для групп
'screen_name': group.get('screen_name', ''),
'name': group.get('name', ''),
'description': group.get('description', ''),
'members_count': group.get('members_count', 0)
}
print(f"Не удалось получить информацию о группе: {domain}")
return None
except Exception as e:
print(f"Ошибка при получении информации о группе: {e}")
return None
def save_group_info(self, group_info: Dict) -> int:
"""
Сохранение информации о группе в базу данных
Args:
group_info: Информация о группе
Returns:
ID группы в базе данных
"""
cursor = self.db.cursor()
try:
cursor.execute('''
INSERT OR REPLACE INTO groups
(group_id, screen_name, name, description, members_count)
VALUES (?, ?, ?, ?, ?)
''', (
group_info['group_id'],
group_info['screen_name'],
group_info['name'],
group_info['description'],
group_info['members_count']
))
self.db.commit()
print(f"Информация о группе сохранена: {group_info['name']}")
return group_info['group_id']
except sqlite3.Error as e:
print(f"Ошибка при сохранении информации о группе: {e}")
self.db.rollback()
return -1
def parse_posts(self, domain: str, post_count: int = 10,
comments_count: int = 10, offset: int = 0) -> Dict:
"""
Парсинг постов из группы с проверкой дубликатов
Args:
domain: Короткое имя или ID группы
post_count: Количество постов для парсинга
comments_count: Количество комментариев к каждому посту
offset: Смещение
Returns:
Словарь со статистикой парсинга
"""
stats = {
'posts_total': 0,
'posts_new': 0,
'posts_duplicate': 0,
'comments_total': 0,
'comments_new': 0,
'comments_duplicate': 0,
'threads_total': 0,
'threads_new': 0,
'threads_duplicate': 0,
'attachments_total': 0
}
all_posts = []
remaining = post_count
current_offset = offset
try:
while remaining > 0:
# Максимальное количество постов за один запрос - 100
count = min(100, remaining)
params = {
'access_token': self.token,
'v': self.version,
'domain': domain,
'count': count,
'offset': current_offset,
'extended': 1,
'fields': 'likes,reposts,views,comments'
}
response = self.session.get(
'https://api.vk.com/method/wall.get',
params=params,
timeout=30
)
if response.status_code != 200:
print(f"Ошибка запроса: {response.status_code}")
break
data = response.json()
if 'error' in data:
error_msg = data['error'].get('error_msg', 'Неизвестная ошибка')
print(f"Ошибка VK API: {error_msg}")
break
if 'response' not in data:
print("Некорректный ответ от API")
break
posts = data['response'].get('items', [])
groups = data['response'].get('groups', [])
# Получаем информацию о группе из ответа
if groups:
group_info = groups[0]
group_id = self.save_group_info({
'group_id': abs(group_info['id']),
'screen_name': group_info.get('screen_name', ''),
'name': group_info.get('name', ''),
'description': group_info.get('description', ''),
'members_count': group_info.get('members_count', 0)
})
for post in posts:
stats['posts_total'] += 1
# Проверяем, существует ли уже пост
if self.is_post_exists(post['id'], post['owner_id']):
print(f"Пост {post['id']} уже существует в базе, пропускаем")
stats['posts_duplicate'] += 1
continue
# Сохраняем пост (даже если есть вложения большого размера, текст сохраняется)
post_saved = self.save_post(post)
if post_saved:
stats['posts_new'] += 1
print(f"Пост {post['id']} сохранен")
# Сохраняем вложения (если они есть)
if 'attachments' in post:
attachments_saved = self.save_attachments(post['attachments'],
post['id'],
post['owner_id'])
stats['attachments_total'] += attachments_saved
# Парсим комментарии для поста (если требуется и пост новый)
if comments_count > 0 and post.get('comments', {}).get('count', 0) > 0 and post_saved:
comment_stats = self.parse_and_save_comments(
post['owner_id'],
post['id'],
comments_count
)
# Добавляем статистику комментариев
stats['comments_total'] += comment_stats['total']
stats['comments_new'] += comment_stats['new']
stats['comments_duplicate'] += comment_stats['duplicate']
stats['threads_total'] += comment_stats['threads_total']
stats['threads_new'] += comment_stats['threads_new']
stats['threads_duplicate'] += comment_stats['threads_duplicate']
all_posts.extend(posts)
remaining -= len(posts)
current_offset += len(posts)
# Если получено меньше постов, чем запрошено, значит больше нет
if len(posts) < count:
break
# Задержка для избежания лимитов API
time.sleep(0.5)
print(f"Получено постов: {len(all_posts)}")
return stats
except requests.exceptions.RequestException as e:
print(f"Ошибка сети: {e}")
return stats
except Exception as e:
print(f"Ошибка при парсинге постов: {e}")
return stats
def save_post(self, post: Dict) -> bool:
"""
Сохранение поста в базу данных
Args:
post: Данные поста
Returns:
True если пост сохранен успешно, False если ошибка
"""
cursor = self.db.cursor()
try:
# Извлекаем статистику
likes = post.get('likes', {}).get('count', 0)
reposts = post.get('reposts', {}).get('count', 0)
views = post.get('views', {}).get('count', 0)
comments = post.get('comments', {}).get('count', 0)
# Формируем URL поста
owner_id = post['owner_id']
post_id = post['id']
post_url = f"https://vk.com/wall{owner_id}_{post_id}"
# Получаем ID группы (owner_id отрицательный для групп)
group_id = abs(owner_id) if owner_id < 0 else None
cursor.execute('''
INSERT INTO posts
(post_id, group_id, owner_id, from_id, date, text,
likes_count, reposts_count, views_count, comments_count,
post_url, has_attachments)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
post_id,
group_id,
owner_id,
post.get('from_id', owner_id),
datetime.datetime.fromtimestamp(post['date']),
post.get('text', ''),
likes,
reposts,
views,
comments,
post_url,
1 if 'attachments' in post else 0
))
self.db.commit()
return True
except sqlite3.IntegrityError:
# Пост уже существует (хотя мы уже проверяли, но на всякий случай)
print(f"Пост {post['id']} уже существует (нарушение уникальности)")
self.db.rollback()
return False
except sqlite3.Error as e:
print(f"Ошибка при сохранении поста: {e}")
self.db.rollback()
return False
def save_attachments(self, attachments: List[Dict], post_id: int, owner_id: int) -> int:
"""
Сохранение вложений поста
Args:
attachments: Список вложений
post_id: ID поста
owner_id: ID владельца поста
Returns:
Количество сохраненных вложений
"""
cursor = self.db.cursor()
saved_count = 0
for attachment in attachments:
try:
attachment_type = attachment['type']
attachment_data = attachment[attachment_type]
# Обрабатываем разные типы вложений
url = ''
title = ''
description = ''
duration = 0
size = 0
if attachment_type == 'photo':
# Берем самую большую доступную фотографию
sizes = attachment_data.get('sizes', [])
if sizes:
largest = max(sizes, key=lambda x: x.get('width', 0) * x.get('height', 0))
url = largest.get('url', '')
elif attachment_type == 'video':
title = attachment_data.get('title', '')
description = attachment_data.get('description', '')
duration = attachment_data.get('duration', 0)
# Для видео сохраняем ссылку на страницу
video_id = attachment_data.get('id', '')
video_owner_id = attachment_data.get('owner_id', '')
if video_id and video_owner_id:
url = f"https://vk.com/video{video_owner_id}_{video_id}"
elif attachment_type == 'audio':
title = attachment_data.get('title', '')
artist = attachment_data.get('artist', '')
description = f"{artist} - {title}"
duration = attachment_data.get('duration', 0)
url = attachment_data.get('url', '')
elif attachment_type == 'doc':
title = attachment_data.get('title', '')
size = attachment_data.get('size', 0)
url = attachment_data.get('url', '')
elif attachment_type == 'link':
url = attachment_data.get('url', '')
title = attachment_data.get('title', '')
description = attachment_data.get('description', '')
elif attachment_type == 'poll':
title = attachment_data.get('question', '')
# Пытаемся сохранить вложение
try:
cursor.execute('''
INSERT OR IGNORE INTO attachments
(post_id, owner_id, attachment_type, url, title,
description, duration, size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
post_id,
owner_id,
attachment_type,
url,
title,
description,
duration,
size
))
if cursor.rowcount > 0:
saved_count += 1
except sqlite3.IntegrityError:
# Вложение уже существует, пропускаем
pass
except Exception as e:
print(f"Ошибка при сохранении вложения: {e}")
continue
try:
self.db.commit()
except:
pass
return saved_count
def parse_and_save_comments(self, owner_id: int, post_id: int,
comments_count: int = 10) -> Dict:
"""
Парсинг и сохранение комментариев к посту с проверкой дубликатов
Args:
owner_id: ID владельца поста
post_id: ID поста
comments_count: Количество комментариев
Returns:
Словарь со статистикой комментариев
"""
stats = {
'total': 0,
'new': 0,
'duplicate': 0,
'threads_total': 0,
'threads_new': 0,
'threads_duplicate': 0
}
all_comments = []
offset = 0
try:
while len(all_comments) < comments_count:
count = min(100, comments_count - len(all_comments))
params = {
'access_token': self.token,
'v': self.version,
'owner_id': owner_id,
'post_id': post_id,
'count': count,
'offset': offset,
'thread_items_count': 10, # Количество ответов в тредах
'extended': 1,
'fields': 'likes'
}
response = self.session.get(
'https://api.vk.com/method/wall.getComments',
params=params,
timeout=30
)
if response.status_code != 200:
break
data = response.json()
if 'error' in data:
error_msg = data['error'].get('error_msg', 'Неизвестная ошибка')
print(f"Ошибка при получении комментариев: {error_msg}")
break
comments = data['response'].get('items', [])
all_comments.extend(comments)
offset += len(comments)
# Если комментариев меньше, чем запрошено
if len(comments) < count:
break
time.sleep(0.3)
# Обрабатываем комментарии
for comment in all_comments:
stats['total'] += 1
# Проверяем, существует ли уже комментарий
if self.is_comment_exists(comment['id'], post_id, owner_id):
stats['duplicate'] += 1
continue
# Сохраняем комментарий
if self.save_comment(comment, post_id, owner_id):
stats['new'] += 1
# Обрабатываем треды
if 'thread' in comment and 'items' in comment['thread']:
thread_stats = self.save_threads(
comment['thread']['items'],
comment['id'],
post_id,
owner_id
)
stats['threads_total'] += thread_stats['total']
stats['threads_new'] += thread_stats['new']
stats['threads_duplicate'] += thread_stats['duplicate']
return stats
except Exception as e:
print(f"Ошибка при парсинге комментариев: {e}")
return stats
def save_comment(self, comment: Dict, post_id: int, owner_id: int) -> bool:
"""
Сохранение комментария в базу данных
Args:
comment: Данные комментария
post_id: ID поста
owner_id: ID владельца поста
Returns:
True если комментарий сохранен успешно
"""
cursor = self.db.cursor()
try:
cursor.execute('''
INSERT INTO comments
(comment_id, post_id, owner_id, from_id, date, text,
likes_count, thread_count, reply_to_comment, reply_to_user)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
comment['id'],
post_id,
owner_id,
comment.get('from_id', 0),
datetime.datetime.fromtimestamp(comment['date']),
comment.get('text', ''),
comment.get('likes', {}).get('count', 0),
comment.get('thread', {}).get('count', 0),
comment.get('reply_to_comment', 0),
comment.get('reply_to_user', 0)
))
self.db.commit()
return True
except sqlite3.IntegrityError:
# Комментарий уже существует
self.db.rollback()
return False
except Exception as e:
print(f"Ошибка при сохранении комментария: {e}")
self.db.rollback()
return False
def save_threads(self, threads: List[Dict], comment_id: int, post_id: int, owner_id: int) -> Dict:
"""
Сохранение тредов (ответов на комментарии) с проверкой дубликатов
Args:
threads: Список тредов
comment_id: ID комментария
post_id: ID поста
owner_id: ID владельца поста
Returns:
Словарь со статистикой тредов
"""
stats = {
'total': len(threads),
'new': 0,
'duplicate': 0
}
cursor = self.db.cursor()
for thread in threads:
try:
# Проверяем, существует ли уже тред
if self.is_thread_exists(thread['id'], comment_id, post_id, owner_id):
stats['duplicate'] += 1
continue
cursor.execute('''
INSERT INTO comment_threads
(thread_id, comment_id, post_id, owner_id, from_id,
date, text, likes_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
thread['id'],
comment_id,
post_id,
owner_id,
thread.get('from_id', 0),
datetime.datetime.fromtimestamp(thread['date']),
thread.get('text', ''),
thread.get('likes', {}).get('count', 0)
))
stats['new'] += 1
except sqlite3.IntegrityError:
# Тред уже существует
stats['duplicate'] += 1
continue
except Exception as e:
print(f"Ошибка при сохранении треда: {e}")
continue
try:
self.db.commit()
except:
pass
return stats
def close(self):
"""Закрытие соединения с базой данных"""
if self.db:
self.db.close()
print("Соединение с базой данных закрыто")
def export_to_csv(self, output_dir: str = "export"):
"""
Экспорт данных в CSV файлы
Args:
output_dir: Директория для экспорта
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
tables = ['groups', 'posts', 'attachments', 'comments', 'comment_threads']
for table in tables:
try:
cursor = self.db.cursor()
cursor.execute(f"SELECT * FROM {table}")
rows = cursor.fetchall()
if rows:
# Получаем названия колонок
column_names = [description[0] for description in cursor.description]
# Сохраняем в CSV
csv_file = os.path.join(output_dir, f"{table}.csv")
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(column_names)
writer.writerows(rows)
print(f"Экспортировано {len(rows)} записей в {csv_file}")
except Exception as e:
print(f"Ошибка при экспорте таблицы {table}: {e}")
def get_user_input():
"""
Получение параметров от пользователя
Returns:
Кортеж (domain, post_count, comments_count)
"""
print("\n" + "=" * 50)
print("ПАРСЕР ГРУПП VK С ПРОВЕРКОЙ ДУБЛИКАТОВ")
print("=" * 50)
# Ввод домена
domain = input("\nВведите домен группы для парсинга \n"
"(например: 'club30366949' или 'omsk'): ").strip()
if not domain:
print("Домен не может быть пустым!")
return None
# Ввод количества постов
while True:
try:
post_count = int(input("\nСколько постов нужно спарсить? (1-1000): "))
if 1 <= post_count <= 1000:
break
else:
print("Введите число от 1 до 1000")
except ValueError:
print("Пожалуйста, введите целое число")
# Ввод количества комментариев
while True:
try:
comments_count = int(input("\nСколько комментариев к каждому посту парсить? (0-100): "))
if 0 <= comments_count <= 100:
break
else:
print("Введите число от 0 до 100")
except ValueError:
print("Пожалуйста, введите целое число")
return domain, post_count, comments_count
def main():
"""Основная функция программы"""
try:
# Получаем параметры от пользователя
user_input = get_user_input()
if not user_input:
return
domain, post_count, comments_count = user_input
# Создаем парсер
parser = VKParser(config.token, config.version)
# Подключаем базу данных
parser.connect_database()
# Получаем информацию о группе
print("\nПолучаю информацию о группе...")
group_info = parser.get_group_info(domain)
if group_info:
print(f"Группа: {group_info['name']}")
print(f"Участников: {group_info['members_count']}")
print(f"Описание: {group_info['description'][:100]}...")
# Сохраняем информацию о группе
parser.save_group_info(group_info)
else:
print(f"Не удалось получить информацию о группе {domain}")
# Все равно продолжаем, может быть группа скрыта
# Парсим посты
print(f"\nНачинаю парсинг {post_count} постов...")
start_time = time.time()
stats = parser.parse_posts(
domain=domain,
post_count=post_count,
comments_count=comments_count,
offset=0
)
elapsed_time = time.time() - start_time
# Выводим статистику
print("\n" + "=" * 50)
print("СТАТИСТИКА ПАРСИНГА")
print("=" * 50)
print(f"Группа: {domain}")
print(f"\nПосты:")
print(f" Всего получено: {stats['posts_total']}")
print(f" Новых сохранено: {stats['posts_new']}")
print(f" Дубликатов пропущено: {stats['posts_duplicate']}")
if comments_count > 0:
print(f"\nКомментарии:")
print(f" Всего получено: {stats['comments_total']}")
print(f" Новых сохранено: {stats['comments_new']}")
print(f" Дубликатов пропущено: {stats['comments_duplicate']}")
print(f"\nТреды (ответы на комментарии):")
print(f" Всего получено: {stats['threads_total']}")
print(f" Новых сохранено: {stats['threads_new']}")
print(f" Дубликатов пропущено: {stats['threads_duplicate']}")
print(f"\nВложений сохранено: {stats['attachments_total']}")
print(f"Время выполнения: {elapsed_time:.2f} секунд")
print(f"Данные сохранены в базу: vk_data.db")
print("=" * 50)
# Экспорт в CSV (опционально)
export_csv = input("\nЭкспортировать данные в CSV? (y/n): ").lower()
if export_csv == 'y':
parser.export_to_csv()
print("Данные экспортированы в папку 'export'")
# Закрываем соединение
parser.close()
print("\nПарсинг завершен успешно!")
except KeyboardInterrupt:
print("\n\nПарсинг прерван пользователем")
except Exception as e:
print(f"\nПроизошла ошибка: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()