2024-03-04 08:38:38 +00:00
|
|
|
import os
|
|
|
|
import time
|
|
|
|
import requests
|
2024-07-17 10:45:12 +00:00
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
import pytz
|
2024-03-04 08:38:38 +00:00
|
|
|
# Get the proxy URL from the environment variable
|
2024-03-15 01:46:06 +00:00
|
|
|
interval = 60*60*3 # 1 minute
|
2024-03-04 08:38:38 +00:00
|
|
|
update_cost = int(60*2)
|
2024-07-17 10:45:12 +00:00
|
|
|
scan_cost = int(10)
|
|
|
|
last_scan = None
|
|
|
|
scan_interval = 24*60*60
|
|
|
|
|
2024-03-04 08:38:38 +00:00
|
|
|
proxy_url = os.getenv('PROXY', "localhost")
|
|
|
|
user = os.getenv('ADMIN_USER_NAME', "")
|
|
|
|
password = os.getenv('ADMIN_PASSWORD', "")
|
|
|
|
|
|
|
|
# Define the login API URL
|
|
|
|
login_url = f'{proxy_url}/api/ctel/login/'
|
|
|
|
login_token = None
|
|
|
|
|
|
|
|
# Define the login credentials
|
|
|
|
login_credentials = {
|
|
|
|
'username': user,
|
|
|
|
'password': password
|
|
|
|
}
|
|
|
|
|
|
|
|
# Define the command to call the update API
|
|
|
|
update_url = f'{proxy_url}/api/ctel/make_report/'
|
|
|
|
update_data = {
|
|
|
|
'is_daily_report': True,
|
|
|
|
'report_overview_duration': '',
|
|
|
|
'subsidiary': None
|
|
|
|
}
|
|
|
|
|
2024-07-17 10:45:12 +00:00
|
|
|
# Define the scan API
|
|
|
|
scan_list_url = f'{proxy_url}/api/automation/'
|
|
|
|
scan_create_url = f'{proxy_url}/api/automation/(id)/scan/'
|
|
|
|
|
|
|
|
def semi_scan(login_token):
|
|
|
|
global last_scan
|
|
|
|
headers = {'Authorization': login_token}
|
|
|
|
sg_tz = sg_tz = pytz.timezone("Asia/Singapore")
|
|
|
|
# check if last scan is [scan_interval] ago
|
|
|
|
if not last_scan:
|
|
|
|
last_scan = time.time() - scan_interval
|
|
|
|
if time.time() - last_scan < scan_interval:
|
|
|
|
return
|
|
|
|
# get all rules:
|
|
|
|
list_rules_response = requests.get(scan_list_url, headers=headers)
|
|
|
|
print(f"[INFO]: Total {len(list_rules_response.json())} rules returned from server")
|
|
|
|
# process rule one by one
|
|
|
|
for rule in list_rules_response.json():
|
|
|
|
data = {
|
|
|
|
"start_date": datetime.now(sg_tz).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+08:00",
|
|
|
|
"end_date": (datetime.now(sg_tz) - timedelta(seconds=time.time()-last_scan)).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+08:00"
|
|
|
|
}
|
|
|
|
response = requests.post(scan_create_url.replace("(id)", str(rule["id"])), json=data, headers=headers)
|
|
|
|
print("[INFO]: scanning rule {} with data: {} status code: {}".format(rule["id"], data, response.status_code))
|
|
|
|
time.sleep(scan_cost)
|
|
|
|
last_scan = time.time()
|
|
|
|
|
2024-03-04 08:38:38 +00:00
|
|
|
|
|
|
|
# def update_report(login_token, report_overview_duration=["30d", "7d"], subsidiary=["all", "SEAU", "SESP", "SME", "SEPCO", "TSE", "SEIN"]):
|
|
|
|
def update_report(login_token, report_overview_duration=["7d", "30d"], subsidiary=["SEAO", "SEAU", "SESP", "SME", "SEPCO", "TSE", "SEIN"]):
|
|
|
|
headers = {'Authorization': login_token}
|
|
|
|
for dur in report_overview_duration:
|
|
|
|
for sub in subsidiary:
|
|
|
|
update_data["report_overview_duration"] = dur
|
|
|
|
update_data["subsidiary"] = sub
|
|
|
|
update_response = requests.post(update_url, data=update_data, headers=headers)
|
|
|
|
print("[INFO]: update_response at {} by {} - {} with status {}".format(datetime.now(), dur, sub, update_response.status_code))
|
|
|
|
update_response.raise_for_status()
|
|
|
|
time.sleep(update_cost)
|
|
|
|
|
|
|
|
# Define the interval in seconds between API calls
|
|
|
|
# time.sleep(60)
|
|
|
|
|
|
|
|
while True:
|
|
|
|
# Call the login API and retrieve the login token
|
|
|
|
if not login_token:
|
|
|
|
login_response = requests.post(login_url, data=login_credentials)
|
|
|
|
# login_response.raise_for_status()
|
|
|
|
if login_response.status_code == 200:
|
|
|
|
login_token = login_response.json()['token']
|
|
|
|
print("[INFO] relogged in at {}".format(datetime.now()))
|
|
|
|
|
|
|
|
# Call the update API
|
|
|
|
try:
|
2024-07-17 10:45:12 +00:00
|
|
|
semi_scan(login_token)
|
2024-03-04 08:38:38 +00:00
|
|
|
update_report(login_token)
|
|
|
|
except Exception as e:
|
|
|
|
print(f"[ERROR]: {e}")
|
|
|
|
print(f"[ERROR]: Failed to update_response, retrying...")
|
|
|
|
login_response = requests.post(login_url, data=login_credentials)
|
|
|
|
# login_response.raise_for_status()
|
|
|
|
if login_response.status_code == 200:
|
|
|
|
login_token = login_response.json()['token']
|
|
|
|
print("[INFO] relogged in at {}".format(datetime.now()))
|
|
|
|
update_report(login_token)
|
|
|
|
|
|
|
|
# Wait for the specified interval
|
|
|
|
time.sleep(interval)
|