Add backend in python/flask

This commit is contained in:
Wim Bonis 2022-05-08 14:14:15 +02:00
commit c8f40ccac3
20 changed files with 948 additions and 13 deletions

View file

@ -0,0 +1,9 @@
{
"users": {
"1": {
"username": "admin",
"salted_passwort": "$2b$12$leojcBt.FeMsX4i3NjQKQ.MRrI/TVg4sa7T/RI/zLOoDzDeDb4L5u",
"role": "admin"
}
}
}

View file

@ -0,0 +1,29 @@
import os, sys
sys.path.append(os.getcwd())
from untils.db import Zero_DB
from untils.textTransform import HASH
from untils.zero_tier_api import ZEROTIER_REST_API
from untils.zerologger import LogurLogger
from loguru import logger
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
if os.path.isdir(sys_vars.default_config_path) == False:
os.mkdir(sys_vars.default_config_path)
db = Zero_DB()
hash = HASH()
zerotier_api = ZEROTIER_REST_API()
logger = LogurLogger(logger).getlogger()
pw_hash = hash.hash_password(sys_vars.default_admin_creditals.get("passwort"))
if db.read_user_credentials(sys_vars.default_admin_creditals.get("username")) == []:
db.write_user_credentials(**{"username" : sys_vars.default_admin_creditals.get("username"), "salted_passwort" : pw_hash, "role" : "admin"})

View file

@ -0,0 +1,26 @@
[
{
"type": "MATCH_ETHERTYPE",
"not": true,
"or": false,
"etherType": 2048
},
{
"type": "MATCH_ETHERTYPE",
"not": true,
"or": false,
"etherType": 2054
},
{
"type": "MATCH_ETHERTYPE",
"not": true,
"or": false,
"etherType": 34525
},
{
"type": "ACTION_DROP"
},
{
"type": "ACTION_ACCEPT"
}
]

View file

@ -0,0 +1,25 @@
# This is a default rule set that allows IPv4 and IPv6 traffic but otherwise
# behaves like a standard Ethernet switch.
#
# Allow only IPv4, IPv4 ARP, and IPv6 Ethernet frames.
#
drop
not ethertype ipv4
and not ethertype arp
and not ethertype ipv6
;
#
# Uncomment to drop non-ZeroTier issued and managed IP addresses.
#
# This prevents IP spoofing but also blocks manual IP management at the OS level and
# bridging unless special rules to exempt certain hosts or traffic are added before
# this rule.
#
#drop
# not chr ipauth
#;
# Accept anything else. This is required since default is 'drop'.
accept;

44
backend-py/main.py Normal file
View file

@ -0,0 +1,44 @@
from flask import Flask, send_from_directory
from flask_cors import CORS
import os
app = Flask(__name__, static_folder="app")
app.config['CORS_HEADERS'] = 'Content-Type'
cors = CORS(app, resources={r"/*": {"origins": "*"}})
from routes.network import route_network
from routes.auth import route_auth
from routes.member import route_member
from routes.user_managment import route_user_managment
app.register_blueprint(route_auth)
app.register_blueprint(route_network)
app.register_blueprint(route_member)
app.register_blueprint(route_user_managment)
@app.errorhandler(404)
def not_found(e):
return app.send_static_file('index.html')
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def serve(path):
if path != "" and os.path.exists(app.static_folder + '/' + path):
return send_from_directory(app.static_folder, path)
else:
return send_from_directory(app.static_folder, 'index.html')
if __name__ == "__main__":
print("""
/-----------------\\
| 𝕬̊𝖗𝖊 𝖛𝖔𝖒 𝕰𝖎𝖘𝖛𝖔𝖑𝖐 |
\-----------------/
""")
app.run(host='0.0.0.0', port=80)

View file

@ -0,0 +1,6 @@
flask
flask_cors
bcrypt
tinydb
loguru
requests

28
backend-py/routes/auth.py Normal file
View file

@ -0,0 +1,28 @@
from flask import jsonify, request, Blueprint
from untils.response import Response
import os
from werkzeug.exceptions import BadRequest
import sys, os
sys.path.append(os.getcwd())
from constants.constants import *
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
route_auth = Blueprint('auth', __name__)
@route_auth.route('/auth/login', methods=['POST'])
def login():
login_data = request.json
userdata = db.read_user_credentials(login_data.get("username"))
if userdata != []:
if hash.is_correct_password(userdata[0].get("salted_passwort"), login_data.get("password")):
response_object = Response(200, "ok", {"role" : userdata[0].get("role"), "token" : "eisvolk"})
else:
raise BadRequest()
else:
raise BadRequest()
return jsonify(response_object)

View file

@ -0,0 +1,67 @@
from flask import jsonify, request, Blueprint
from untils.response import Response
import sys, os
sys.path.append(os.getcwd())
from constants.constants import *
from services.member import MEMBER
services_member = MEMBER()
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
route_member = Blueprint('member', __name__)
@route_member.route('/api/network/<string:nwid>/member', methods=['GET', 'DELETE'])
def api_network_nwid_member(nwid : str):
if request.method == "GET":
mids = zerotier_api.make_request(request.method, f"/controller/network/{nwid}/member")
data = services_member.getMembersData(nwid, mids.json().keys())
response_object = Response(200, "ok", data)
return jsonify(response_object)
@route_member.route('/api/network/<string:nwid>/member/<string:mid>', methods=['DELETE', 'POST'])
def api_network_nwid_member_member(nwid : str, mid : str):
if request.method == "POST":
services_member.updateMemberAdditionalData(nwid, mid, request.json)
if type(request.json) == list:
zerotier_api.make_request(request.method, f"/controller/network/{nwid}/member/{mid}", json = {"authorized" : request.json[0]["authorized"]})
data = services_member.getMembersData(nwid, [mid])
elif (request.json.get("config")):
zerotier_api.make_request(request.method, f"/controller/network/{nwid}/member/{mid}", json = request.json["config"])
data = services_member.getMembersData(nwid, [mid])
else:
data = services_member.getMembersData(nwid, [mid])
response_object = Response(200, "ok", data)
elif request.method == "DELETE":
services_member.deleteMemberAdditionalData(nwid, mid)
zerotier_api.make_request(request.method, f"/controller/network/{nwid}/member/{mid}")
defaultConfig = {
"authorized": False,
"ipAssignments": [],
"capabilities": [],
"tags": []
}
zerotier_api.make_request("POST", f"/controller/network/{nwid}/member/{mid}", json = defaultConfig)
response_object = Response(200, "ok")
return jsonify(response_object)

View file

@ -0,0 +1,72 @@
from flask import jsonify, request, Blueprint
from untils.response import Response
import sys, os
sys.path.append(os.getcwd())
from constants.constants import *
from services.network import NETWORK
services_network = NETWORK()
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
route_network = Blueprint('network', __name__)
@route_network.route('/api/network', methods=['GET', 'POST'])
def api_network():
"""Get all Networks"""
if request.method == 'GET':
controllerResponse = zerotier_api.make_request("GET", "/controller/network")
nwids = controllerResponse.json()
data = services_network.getNetworksData(nwids)
response_object = Response(200, "ok", data)
elif request.method == 'POST':
"""Create new Network"""
zt = zerotier_api.make_request("GET", "/status").json()["address"]
reqData = request.json["config"]
reqData["rules"] = sys_vars.default_rules
response = zerotier_api.make_request("POST", f"/controller/network/{zt}______", json = reqData)
data = services_network.getNetworksData([response.json()["id"]])
response_object = Response(200, "ok", data[0])
return jsonify(response_object)
@route_network.route('/api/network/<string:nwid>', methods=['GET', 'DELETE', 'POST'])
def api_network_nwid(nwid : str):
if request.method == "GET":
network = services_network.getNetworksData([nwid])[0]
response_object = Response(200, "ok", network)
elif request.method == "DELETE":
zerotier_api.make_request(request.method, f"/controller/network/{nwid}")
response_object = Response(200, "ok", db.delete_network(nwid))
elif request.method == "POST":
services_network.updateNetworkAdditionalData(nwid, request.json)
if request.json.get("config"):
services_network.updateNetworkAdditionalData(nwid, request.json["config"])
zerotier_api.make_request("POST", f"/controller/network/{nwid}", json = request.json.get("config"))
response_object = Response(200, "ok")
return jsonify(response_object)

View file

@ -0,0 +1,84 @@
from flask import jsonify, request, Blueprint
from untils.response import Response
from threading import Timer
import sys, os
sys.path.append(os.getcwd())
from constants.constants import *
from services.user_managment import USER_MANAGMENT
services_user_managment = USER_MANAGMENT()
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
route_user_managment = Blueprint('user_managment', __name__)
@route_user_managment.route('/api/user-managment', methods=['GET', 'POST'])
def api_user_managment():
if request.method == 'POST':
user = services_user_managment.createUserData(request.json["username"], request.json["passwort"], request.json["role"])
response_object = Response(200, "ok")
elif request.method == 'GET':
users = []
data = services_user_managment.getUsersData()
for user in data:
users.append(user.get("username"))
response_object = Response(200, "ok", users)
return jsonify(response_object)
@route_user_managment.route('/api/user-managment/del/<string:user>', methods=['GET', 'POST'])
def api_user_managment_del(user : str):
if request.method == 'POST':
user = services_user_managment.delUserData(user)
if user:
response_object = Response(200, "ok")
return jsonify(response_object)
@route_user_managment.route('/api/user-managment/member/<string:user>', methods=['GET', 'POST'])
def api_user_managment_update_member(user : str):
if request.method == 'GET':
member = services_user_managment.getUserMember(user)
for network in member:
for idx, user_member in enumerate(member[network]):
auth = services_user_managment.isUserMemberAuth(network, user_member["address"])
member[network][idx]["authorized"] = auth
response_object = Response(200, "ok", member)
elif request.method == 'POST':
services_user_managment.updateUserMember(user, request.json["right"])
response_object = Response(200, "ok")
return jsonify(response_object)
@route_user_managment.route('/api/user-managment/member/<string:user>/reason', methods=['POST'])
def api_user_managment_log_reason(user : str):
if request.method == 'POST':
logger.info(f'Reason : {request.json["reason"]} | Duration : {request.json["duration"]}', ip=request.remote_addr, username=user)
t = Timer(services_user_managment.convert(request.json["duration"]), services_user_managment.deauth, args=(request.json["nwid"],request.json["mid"]))
t.start()
response_object = Response(200, "ok")
return jsonify(response_object)

View file

@ -0,0 +1,121 @@
import sys, os, time
import sys, os
sys.path.append(os.getcwd())
from constants.constants import *
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
class MEMBER():
"""Classe um die Anfragen rund ums MemberVerwaltung zu handeln"""
def getPeer(self, mid : str) -> dict:
return zerotier_api.make_request("GET", f"/peer/{mid}")
def filterDeleted(self, nwid : str, mid : str) -> str:
try:
if db.read_networks_member_value(nwid, mid).get("deleted") != True:
return mid
except:
return mid
def getMembersData(self, nwid : str , mids : str) -> list:
members = []
for mid in mids:
if self.filterDeleted(nwid, mid):
member = zerotier_api.make_request("GET", f"/controller/network/{nwid}/member/{mid}").json()
members.append(self.getMemberAdditionalData(member))
return members
def getMemberAdditionalData(self, data : dict) -> dict:
#DB BUG INITIALIZATION MIGRATION
if db.read_networks_member_value(data.get("nwid"),data.get("id")) == None:
members = db.read_networks_value(data.get("nwid"))[0].get("members")
if members != None:
members.append({"id" : data.get("address")})
db.table_update("networks", {"members" : members } , db.read_networks_value(data.get("nwid"))[0].doc_id)
else:
db.table_update("networks", {"members" : [{"id" : data.get("address")}] } , db.read_networks_value(data.get("nwid"))[0].doc_id)
#END MIGRATION SECTION
peerData = {}
zt = zerotier_api.make_request("GET", "/status").json()["address"]
peer = self.getPeer(data.get("id")).json()
if (peer):
peerData["latency"] = peer.get("latency")
peerData["online"] = 1 if peer.get("latency") != -1 else 2
peerData["clientVersion"] = peer.get("version")
if peer.get("paths"):
peerData["lastOnline"] = peer.get("paths")[0]["lastReceive"]
peerData["physicalAddress"] = peer.get("paths")[0]["address"].split("/")[0]
peerData["physicalPort"] = peer.get("paths")[0]["address"].split("/")[1]
else:
peerData["online"] = 0
del data["lastAuthorizedCredential"]
del data["lastAuthorizedCredentialType"]
del data["objtype"]
del data["remoteTraceLevel"]
del data["remoteTraceTarget"]
additionalData = {}
try :
additionalData = db.read_networks_member_value(data.get("nwid"),data.get("id"))["additionalConfig"]
except:
additionalData = {}
memberData = {
"id" : data.get("nwid") + "-" + data.get("id"),
"type" : "Member",
"clock" : time.time(),
"networkId" : data.get("nwid"),
"nodeId" : data.get("id"),
"controllerId" : zt,
**additionalData,
**peerData,
"config" : data
}
return memberData
def updateMemberAdditionalData(self, nwid : str, mid : str, data : dict) -> None:
try:
additionalConfig = db.read_networks_member_value(nwid, mid)["additionalConfig"]
except:
additionalConfig = {}
## Only for Usermanagment
if type(data) == list:
data = data[0]
authorized = data["authorized"] if data["authorized"] == True else False
db.update_network_members(nwid, mid, {"additionalConfig" : db.merge_two_dicts(additionalConfig, {"authorized" : authorized})} )
##
else:
if data.get("config") and data.get("config").get("authorized") != None:
authorized = data.get("config")["authorized"] if data.get("config")["authorized"] == True else False
db.update_network_members(nwid, mid, {"additionalConfig" : db.merge_two_dicts(additionalConfig, {"authorized" : authorized})} )
additionalData = data.get("config") if data.get("config") != None else {}
if (data.get("name")):
additionalData["name"] = data["name"]
if (data.get("description")):
additionalData["description"] = data["description"]
if additionalData:
db.update_network_members(nwid, mid, {"additionalConfig" : db.merge_two_dicts(additionalConfig, additionalData)})
def deleteMemberAdditionalData(self, nwid, mid) -> None:
db.update_network_members(nwid, mid, {"id": mid, "deleted" : True})

View file

@ -0,0 +1,79 @@
import sys, os, time
sys.path.append(os.getcwd())
from constants.constants import *
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
class NETWORK():
"""Classe um die Anfragen rund ums Networking zu handeln"""
def getNetworksData(self, nwids : list) -> list:
networks = []
for nwid in nwids:
controllerResponse = zerotier_api.make_request("GET", f"/controller/network/{nwid}").json()
networks.append(self.getNetworkAdditionalData(controllerResponse))
return networks
def getNetworkAdditionalData(self, data : dict) -> dict:
if db.read_networks_value(data.get("id")) == []:
self.createNetworkAdditionalData(data.get("id"))
additionalData = db.read_networks_value(data.get("id"))[0].get("additionalConfig")
del data["rulesSource"]
del data["objtype"]
del data["revision"]
del data["remoteTraceLevel"]
del data["remoteTraceTarget"]
networkData = {
"id" : data.get("id"),
"type" : "Network",
"clock" : time.time(),
"config" : data,
}
networkData = db.merge_two_dicts(networkData,additionalData)
return networkData
def createNetworkAdditionalData(self, nwid : str) -> None:
saveData = {
"id": nwid,
"additionalConfig": {
"description": "",
"rulesSource": sys_vars.default_rule_source,
"tagsByName": {},
"capabilitiesByName": {},
},
"members": [],
}
db.write_value("networks", saveData)
def updateNetworkAdditionalData(self, nwid : str, data : dict) -> None:
additionalData = {}
if (data.get("name")):
additionalData["name"] = data.get("name")
if (data.get("description")):
additionalData["description"] = data.get("description")
if (data.get("rulesSource")):
additionalData["rulesSource"] = data.get("rulesSource")
if (data.get("tagsByName")):
additionalData["tagsByName"] = data.get("tagsByName")
if (data.get("capabilitiesByName")):
additionalData["capabilitiesByName"] = data.get("capabilitiesByName")
if (additionalData):
db.update_network(nwid, additionalData)

View file

@ -0,0 +1,66 @@
import sys, os, time
from typing import Hashable
sys.path.append(os.getcwd())
from constants.constants import *
from services.member import MEMBER
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
class USER_MANAGMENT():
"""Classe um die Anfragen rund ums UserManagment zu handeln"""
def _hash_passwort(self, passwort : str) -> Hashable:
return hash.hash_password(passwort)
def createUserData(self, username : str, passwort : str, role : str):
if db.read_user_credentials(username) == []:
db.write_user_credentials(**{"username" : username, "salted_passwort" : self._hash_passwort(passwort), "role" : role})
return True
def delUserData(self, username : str) -> bool:
if db.read_user_credentials(username) != []:
db.delete_user(username)
return True
def getUsersData(self) -> list:
return db.read_users()
def getUserMember(self, user : str) -> dict:
members = db.read_user_member(user)[0].get("members")
if members:
return members
def updateUserMember(self, user : str, data : dict) -> None:
db.clean_user_member(user)
if data[0].get("config") != None:
for member in data:
memberData = {"name" : self.fix_none_username(member.get("name")), "address" : member.get("config")["address"]}
time.sleep(0.5)
db.update_user_member(user, member["networkId"], memberData)
else:
db.clean_user_member(user)
def isUserMemberAuth(self, nwid : str, mid : str) -> bool:
data = MEMBER().getMembersData(nwid, [mid])
return data[0]["config"]["authorized"]
def fix_none_username(self, name : str) -> str:
if name == None:
name = "Dummy Name"
return name
def deauth(self, nwid, mid):
zerotier_api.make_request("POST", f"/controller/network/{nwid}/member/{mid}", json = {"authorized" : False})
def convert(self, hours : int):
minuts = hours * 60
seconds = minuts * 60
return seconds

142
backend-py/untils/db.py Normal file
View file

@ -0,0 +1,142 @@
import os
import sys
import os, time
sys.path.append(os.getcwd())
from tinydb import TinyDB, Query
from tinydb import TinyDB, where
from tinydb.table import Document
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
class Zero_DB():
"""TinyDB Json Datenbank um verschiedene Werte zu speicher"""
"""PS: Das war teilweiße ein ganz schöner Krampf"""
def __init__(self, path=sys_vars.default_config_path, **kwargs) -> None:
self.db = TinyDB(path + 'db.json', indent=4)
self.users = self.db.table("users")
self.networks = self.db.table("networks")
self.query = Query()
def write_user_credentials(self, username, salted_passwort, role):
self.users.insert({
"username" : username,
"salted_passwort" : salted_passwort,
"role" : role
})
def merge_two_dicts(self, x, y):
"""Given two dictionaries, merge them into a new dict as a shallow copy."""
z = x.copy()
z.update(y)
return z
def read_user_credentials(self, user):
return self.users.search(self.query.username == user)
def read_user_member(self, user):
return self.users.search(self.query.username == user)
def read_users(self):
return self.users.all()
def read_networks_value(self, value):
return self.networks.search(self.query.id == value)
def read_networks_member_value(self, nwid, mid):
for item in self.networks.search(self.query.id == nwid):
if item.get("members") != None:
for member in item.get("members"):
if member.get("id") == mid:
return member
def update_network(self, nwid, data):
self.networks.upsert(Document({ "additionalConfig" : self.merge_two_dicts(self.read_networks_value(nwid)[0].get("additionalConfig"), data)}, doc_id=self.read_networks_value(nwid)[0].doc_id))
def update_network_members(self, nwid, mid, data):
upd = []
for item in self.read_networks_value(nwid)[0]["members"]:
if item["id"] == mid:
upd.append(self.merge_two_dicts(item, data))
else:
upd.append(item)
self.networks.upsert(Document({ "members" : upd }, doc_id=self.read_networks_value(nwid)[0].doc_id))
def find_networks_id_by_member(self, value):
for network in self.networks.all():
for member in network.get("members"):
if member == value:
return network.get("id")
def clean_user_member(self, user):
self.users.upsert(Document({ "members" : {} }, doc_id=self.read_user_member(user)[0].doc_id))
def update_user_member(self, user, key, data : dict):
members = {}
def _get_dict_values(item : str):
return self.read_user_member(user)[0].get("members")[item]
def _get_dict_key(item : dict):
return item
if self.read_user_member(user)[0].get("members") != None and self.read_user_member(user)[0].get("members") != {}:
popd = self.read_user_member(user)[0].get("members")
try:
popd[key]
except:
pass
members.update(popd)
for item in self.read_user_member(user)[0].get("members"):
if _get_dict_key(item) == key:
members.update({key : [data, *_get_dict_values(item)]})
else:
members.update({key : [data]})
else:
members.update({key : [data]})
self.users.upsert(Document({ "members" : members }, doc_id=self.read_user_member(user)[0].doc_id))
def delete_network(self, value):
return self.networks.remove(self.query.id == value)
def delete_user(self, value):
return self.users.remove(self.query.username == value)
def write_value(self, table, value):
self.db.table(table).insert(value)
def read_table_values(self, table):
return self.db.table(table).all()
def table_update(self, table, value, doc_id):
self.db.table(table).update(value, doc_ids=[doc_id])
def return_db(self):
return self.db
if __name__ == "__main__":
db = Zero_DB()
nwid = "95004cf76d12a86e"
mid = "95004cf76d"
from tinydb.operations import set
from tinydb.table import Document
db.networks.remove(where('id') == 'xxx')

View file

@ -0,0 +1,9 @@
from dataclasses import dataclass, field
@dataclass
class Response():
"""Default Response für Frontend"""
success : int
message : str
data : list = field(default=None)

View file

@ -0,0 +1,26 @@
from typing import Tuple
import bcrypt
class HASH():
"""Classe zum Hashen und zum Salzen des Passwortest"""
def hash_password(self, password: str) -> Tuple[bytes, bytes]:
"""
Hash the provided password with a randomly-generated salt and return the
hash to store in the database.
"""
salt = bcrypt.gensalt()
pw_hash = bcrypt.hashpw(password.encode(encoding='utf-8'), salt)
return pw_hash.decode()
def is_correct_password(self, pw_hash: str, password: str) -> bool:
"""
Given a previously-stored hash, and a password provided by a user
trying to log in, check whether the password is correct.
"""
success = bcrypt.checkpw(password.encode('utf-8'), pw_hash.encode())
return success

View file

@ -0,0 +1,21 @@
import sys
import os
import requests
sys.path.append(os.getcwd())
from vars import DARWIN, LINUX
sys_vars = DARWIN() if os.uname().sysname == "Darwin" else LINUX()
class ZEROTIER_REST_API():
"""Classe um Requests an die Zerotier API zu machen"""
token = sys_vars.zerotier_token
headers = {"Accept": "application/json", "Content-Type": "application/json", "X-ZT1-Auth": token}
base_URL = "http://localhost:9993" if os.environ.get("ZU_CONTROLLER_ENDPOINT") == None else str(os.environ["ZU_CONTROLLER_ENDPOINT"])
def make_request(self, method: str, url_piece: str, **kwargs) -> dict:
response = requests.request(method, url=self.base_URL + url_piece, headers=self.headers, **kwargs)
return response

View file

@ -0,0 +1,27 @@
import sys
class LogurLogger(object):
def __init__(self, logger) -> None:
self.logger = logger
self.logger.remove(0)
self.logger.add("./config/UserAuth.log", format=self.default_formatter, colorize=False, rotation="10 MB")
self.logger.add(sys.stderr, format=self.default_formatter,colorize=True)
def rainbow(self, text):
colors = ["red", "yellow", "green", "cyan", "blue", "magenta"]
chars = ("<{}>{}</>".format(colors[i % len(colors)], c) for i, c in enumerate(text.split()))
return " ".join(chars)
def default_formatter(self, record):
rainbow_message = self.rainbow(record["message"])
# Prevent '{}' in message (if any) to be incorrectly parsed during formatting
escaped = rainbow_message.replace("{", "{{").replace("}", "}}")
return "<blue>{time}</blue> | <green>{level}</green> | {extra[ip]} | {extra[username]} | " + escaped + "\n{exception}"
def getlogger(self):
return self.logger

41
backend-py/vars.py Normal file
View file

@ -0,0 +1,41 @@
import pathlib
import os
class OS_DEFAULTS():
"""Classe um die Default Pfade und Variablen bereitzustellen"""
default_config_path = './config/'
try:
username = str(os.environ['ZU_DEFAULT_USERNAME'])
password = str(os.environ['ZU_DEFAULT_PASSWORD'])
except:
username = "admin"
password = "zero-ui"
default_admin_creditals = {
"username" : username,
"passwort" : password
}
default_rule_source = open("./constants/default_rules_source.txt").read()
default_rules = open("./constants/default_rules.json").read()
class DARWIN(OS_DEFAULTS):
"""Classe um die Pfade und Variablen des Betriebsystems bereitzustellen"""
def __init__(self) -> None:
self.default_path = str(pathlib.Path.home()) + "/Zerotier"
self.zerotier_token = open("/Library/Application Support/ZeroTier/One/authtoken.secret", encoding="utf-8").read()
class LINUX(OS_DEFAULTS):
"""Classe um die Pfade und Variablen des Betriebsystems bereitzustellen"""
def __init__(self) -> None:
self.default_path = "/Zerotier"
self.zerotier_token = open("/var/lib/zerotier-one/authtoken.secret", encoding="utf-8").read()

View file

@ -11,20 +11,33 @@ COPY ./frontend /app/frontend
RUN yarn build
FROM node:lts-alpine
### Node Backend
#FROM node:lts-alpine
#
#WORKDIR /app/frontend/build
#COPY --from=build-stage /app/frontend/build /app/frontend/build/
#
#WORKDIR /app/backend
#COPY ./backend/package*.json /app/backend
#RUN yarn install
#
#COPY ./backend /app/backend
#
#EXPOSE 4000
#ENV NODE_ENV=production
#ENV ZU_SECURE_HEADERS=true
#ENV ZU_SERVE_FRONTEND=true
#
#CMD [ "node", "./bin/www" ]
WORKDIR /app/frontend/build
COPY --from=build-stage /app/frontend/build /app/frontend/build/
#### Python Backend
FROM tiangolo/uwsgi-nginx-flask:python3.9
WORKDIR /app/backend
COPY ./backend/package*.json /app/backend
RUN yarn install
COPY ./backend-py/requirements.txt /app/requirements.txt
COPY ./backend /app/backend
RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt
EXPOSE 4000
ENV NODE_ENV=production
ENV ZU_SECURE_HEADERS=true
ENV ZU_SERVE_FRONTEND=true
CMD [ "node", "./bin/www" ]
COPY ./backend-py /app
COPY --from=build-stage /app/frontend/build /app/app/
ENV FLASK_APP=main.py
EXPOSE 80