This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdatabase_manager.py
117 lines (92 loc) · 3.26 KB
/
database_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding: utf-8 -*-
from sqlalchemy import ForeignKey
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import Column
from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
'''
SQLAlchemy-related classes and methods.
'''
class Database:
""" """
def __init__(self, database_path=None, debug_mode=False):
self.database = database_path
self.debug_mode = debug_mode
self.start_engine()
# self.clear_all_tables()
def start_engine(self):
# CONNECT
if not self.database:
self.database = 'data.db'
self.engine = create_engine('sqlite:///' + self.database, echo=False)
self.connection = self.engine.connect()
# Create Session for sqlalchemy ...
Session = sessionmaker(bind=self.engine)
self.session = Session()
Base.metadata.create_all(self.engine)
def clear_all_tables(self):
for table in reversed(Base.metadata.sorted_tables):
self.connection.execute(table.delete())
self.session.commit()
def connection_close(self):
self.connection.close()
def commit(self):
self.session.commit()
def get_product_details(self, slug):
# print(slug)
data = self.session.query(
Product.id, Product.slug).filter(
Product.name == slug).first()
# print(data)
return data
def get_release_id(self, product_slug, version):
# print(product_slug)
# print(version)
data = self.session.query(
Release.id).filter(
Release.product_slug == product_slug).filter(
Release.version == version.strip()).first()
# print(data)
return data
def get_file_details(self, release_id, file_name):
# print(release_id)
# print(file_name)
data = self.session.query(
ProductFile.id,
ProductFile.release_id,
ProductFile.filename,
ProductFile.download_url,
ProductFile.md5,
ProductFile.release_date).filter(
ProductFile.release_id == release_id).filter(
ProductFile.filename == file_name).first()
# print(data)
return data
def check_file_exists(self, file):
return self.session.query(
ProductFile.filename).filter(
ProductFile.filename == file).first()
Base = declarative_base()
class Product(Base):
__tablename__ = 'product'
id = Column(Integer, primary_key=True)
slug = Column(String)
name = Column(String)
file_groups_url = Column(String)
product_files_url = Column(String)
class Release(Base):
__tablename__ = 'release'
id = Column(Integer, primary_key=True)
product_slug = Column(String, ForeignKey('product.slug'))
version = Column(String)
file_groups_url = Column(String)
product_files_url = Column(String)
class ProductFile(Base):
__tablename__ = 'product_file'
id = Column(Integer, primary_key=True)
release_id = Column(Integer, ForeignKey('release.id'), primary_key=True)
filename = Column(String)
download_url = Column(String)
md5 = Column(String)
release_date = Column(String)