79 lines
2.7 KiB
Python
Executable File
79 lines
2.7 KiB
Python
Executable File
import json
|
|
from datetime import date, datetime, timedelta
|
|
from typing import List
|
|
|
|
import requests
|
|
|
|
date_format = "%Y-%m-%dT%H:%M:%S%z"
|
|
|
|
|
|
class EngieAPI:
|
|
def __init__(self, cache: bool = False):
|
|
self.url = "https://benergy-back.azurewebsites.net/api/gateway/mercure"
|
|
self.headers = {
|
|
"Host": "benergy-back.azurewebsites.net",
|
|
"Content-Type": "application/json",
|
|
"Origin": "https://engieenergy.azurewebsites.net",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"Accept": "application/json, text/plain, */*",
|
|
"User-Agent": "Mozilla/5.0 (Linux; Android 7.0; SM-G930V Build/NRD90M) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.125 Mobile Safari/537.36",
|
|
"Referer": "https://engieenergy.azurewebsites.net/",
|
|
"Content-Length": "294",
|
|
"Accept-Language": "en-GB,en;q=0.9",
|
|
}
|
|
|
|
self.cache = cache
|
|
self.cache_file = "EngieCache.json"
|
|
|
|
def get_epex_spot(self, date: date) -> List[float]:
|
|
if self.cache:
|
|
try:
|
|
with open("EngieCache.json", "r") as f:
|
|
cache = json.load(f)
|
|
except FileNotFoundError:
|
|
cache = {}
|
|
|
|
if date.strftime("%Y-%m-%d") in cache:
|
|
return cache[date.strftime("%Y-%m-%d")]
|
|
|
|
to_date = datetime(date.year, date.month, date.day, 23, 59, 59)
|
|
from_date = to_date - timedelta(days=1, hours=23, minutes=59, seconds=59)
|
|
|
|
data = {
|
|
"ApplicationDate": {
|
|
"From": from_date.strftime(date_format),
|
|
"To": to_date.strftime(date_format),
|
|
"UseTime": True,
|
|
},
|
|
"IncludeOffset": True,
|
|
"Items": [{"IdRef": 7070}],
|
|
}
|
|
|
|
res = requests.post(self.url, headers=self.headers, data=json.dumps(data))
|
|
|
|
values = []
|
|
for j in res.json()[0]["Points"]:
|
|
# check if the Date is the same as the one requested, attention to the timezone
|
|
if datetime.strptime(j["Date"], date_format).date().strftime(
|
|
"%Y-%m-%d"
|
|
) == date.strftime("%Y-%m-%d"):
|
|
values.append(j["Value"])
|
|
|
|
if self.cache:
|
|
# check if the cache exists else create it
|
|
try:
|
|
with open("EngieCache.json", "r") as f:
|
|
cache = json.load(f)
|
|
except FileNotFoundError:
|
|
cache = {}
|
|
|
|
# update cache
|
|
cache[date.strftime("%Y-%m-%d")] = values
|
|
|
|
with open("EngieCache.json", "w+") as f:
|
|
json.dump(cache, f)
|
|
|
|
if not values:
|
|
raise ValueError("No data found for this date")
|
|
return values
|