import datetime
import importlib
import random
import shutil
import time
from pathlib import Path, PurePath
from typing import List, Tuple, Generator
import numpy as np
import pandas as pd
from . import functions
from .utils import COLUMNS, gen_fake_cpu_work
[docs]def _make_empty_df() -> 'pd.DataFrame':
"""Generates an empy Dataframe with che columns indicated in COLUMNS dict.
:return: a new DataFrame
:rtype: pd.DataFrame
"""
df = pd.DataFrame(index=None)
for column, type_ in COLUMNS.items():
df[column] = pd.Series(dtype=type_)
return df
[docs]class Day(object):
def __init__(self, date: 'datetime.date', df: 'pd.DataFrame' = None):
"""Initialize current day basic information.
:param date: The current date of the Day object
:type date: datetime.date
"""
self._date = date
if df is not None:
self._df = df
else:
self._df = _make_empty_df()
[docs] def __repr__(self):
return f"{self._date}=>{self._df.to_string()}"
@property
[docs] def df(self):
return self._df
[docs] def reset_index(self):
"""Reset the dataframe index inplace.
:return: self
:rtype: Day
"""
self._df.reset_index(drop=True, inplace=True)
return self
[docs] def bulk_append(self, rows: List[dict]):
"""Insert a bunch of rows into the day's dataframe.
For each row it sets these default value:
- reqDay = int(time.mktime(self._date.timetuple()))
- JobSuccess = True
- SiteName = 0
- DataType = 0
- FileType = 0
Also, if there is no information about the job this function generates
a random fake information on cpu work using `gen_fake_cpu_work`:
- NumCPU
- WrapWC
- WrapCPU
- CPUTime
- IOTime
:param rows: List of rows
:type rows: List[dict]
:return: self
:rtype: Day
"""
cur_req_time = int(time.mktime(self._date.timetuple()))
for row in rows:
row['reqDay'] = cur_req_time
row['JobSuccess'] = True
row['SiteName'] = 0
row['DataType'] = 0
row['FileType'] = 0
for key in row:
if key not in [
'NumCPU',
'WrapWC',
'WrapCPU',
'CPUTime',
'IOTime',
]:
num_cpus, wall_time, cpu_time, single_cpu_time, io_time = gen_fake_cpu_work()
row['NumCPU'] = num_cpus
row['WrapWC'] = wall_time
row['WrapCPU'] = cpu_time
row['CPUTime'] = single_cpu_time
row['IOTime'] = io_time
break
keys = list(rows[0].keys())
new_df = pd.DataFrame(data={
key: [elm[key] for elm in rows]
for key in keys
})
new_df.Size *= 1024**2
self._df = self._df.append(new_df, ignore_index=True)
return self
[docs] def append(self, row: dict):
"""Insert a single row into the day's dataframe.
It sets these default value:
- reqDay = int(time.mktime(self._date.timetuple()))
- JobSuccess = True
- SiteName = 0
- DataType = 0
- FileType = 0
If there is no information about the job this function generates
a random fake information on cpu work using `gen_fake_cpu_work`:
- NumCPU
- WrapWC
- WrapCPU
- CPUTime
- IOTime
:param row: the current row's columns
:type row: dict
:return: self
:rtype: Day
"""
row['reqDay'] = int(time.mktime(self._date.timetuple()))
row['JobSuccess'] = True
row['SiteName'] = 0
row['DataType'] = 0
row['FileType'] = 0
row['Size'] = row['Size'] * 1024**2 # Convert to bytes
for key in row:
if key not in [
'NumCPU',
'WrapWC',
'WrapCPU',
'CPUTime',
'IOTime',
]:
num_cpus, wall_time, cpu_time, single_cpu_time, io_time = gen_fake_cpu_work()
row['NumCPU'] = num_cpus
row['WrapWC'] = wall_time
row['WrapCPU'] = cpu_time
row['CPUTime'] = single_cpu_time
row['IOTime'] = io_time
break
self._df = self._df.append(
pd.Series(
[row[key] if key in row else None for key in COLUMNS],
index=COLUMNS.keys()
),
ignore_index=True
)
return self
[docs] def save(self, dest_folder: 'PurePath' = Path(".")):
"""Export the current day dataframe in a zipped csv format.
:param dest_folder: the destination directory, defaults to Path(".")
:type dest_folder: PurePath, optional
:return: self
:rtype: Day
"""
file_path = dest_folder.joinpath(f"dataset_{self._date}.csv.gz")
self._df.to_csv(file_path, index=False)
return self
[docs]class Generator(object):
"""The main generatore object that creates datasets."""
def __init__(self,
config: dict = {},
num_days: int = -1,
num_req_x_day: int = -1,
start_date: 'datetime.date' = datetime.date(2020, 1, 1),
seed: int = _DEFAULT_SEED,
dest_folder: 'PurePath' = Path("."),
):
"""Initialize the generator.
:param config: A dictionary with the configuration to use, defaults to {}
:type config: dict, optional
:param num_days: number of days to generate, defaults to -1
:type num_days: int, optional
:param num_req_x_day: number of requests per day, defaults to -1
:type num_req_x_day: int, optional
:param start_date: the starting date of the generator data, defaults to datetime.date(2020, 1, 1)
:type start_date: datetime, optional
:param seed: the random generator seed, defaults to _DEFAULT_SEED
:type seed: int, optional
:param dest_folder: the folder where to store the dataset, defaults to Path(".")
:type dest_folder: PurePath, optional
"""
self._start_date = start_date
self._days = []
self._seed = seed
for key, val in config.items():
setattr(self, f"_{key}", val)
if num_days != -1:
self._num_days = num_days
if num_req_x_day != -1:
self._num_req_x_day = num_req_x_day
self._dest_folder = dest_folder
self.__update_seeds()
@property
[docs] def seed(self):
return self._seed
@seed.setter
def seed(self, value: int):
"""Change seed value
:param value: new seed value
:type value: int
"""
assert isinstance(value, int), "ERROR: value in not an integer"
self._seed = value
self.__update_seeds()
[docs] def __update_seeds(self):
"""Initialize the random generator seeds.
Internal Python random generator seed and NumPy random seed.
"""
random.seed(self._seed)
np.random.seed(self._seed)
@property
[docs] def df(self) -> 'pd.DataFrame':
"""Returns a new dataframes that contains all the days' dataframes.
:return: the concatenated dataframe
:rtype: pd.DataFrame
"""
all_df = pd.concat([day.df for day in self._days])
all_df['Date'] = pd.to_datetime(all_df.reqDay, unit='s')
return all_df
@property
[docs] def df_stats(self) -> Tuple['pd.DataFrame']:
"""Returns the concat days' dataframes and some useful stats
:return: a tuple with several DataFrames
:rtype: Tuple[pd.DataFrame]
"""
df = self.df
file_frequencies = df.Filename.value_counts().reset_index()
file_frequencies.rename(
columns={'Filename': "# requests", 'index': "Filename"},
inplace=True
)
all_day_file_size = {'day': [], 'Size': []}
for idx, group in df.groupby('reqDay')[['Filename', 'Size']]:
all_day_file_size['day'].append(pd.to_datetime(idx, unit="s"))
all_day_file_size['Size'].append(
group.drop_duplicates('Filename').Size.sum() / 1024**2
)
all_day_file_size = pd.DataFrame(data=all_day_file_size)
num_files = df.groupby('reqDay').Filename.nunique()
num_files = num_files.reset_index()
num_files['day'] = pd.to_datetime(num_files.reqDay, unit="s")
num_files.rename(
columns={'Filename': "numFiles"},
inplace=True
)
num_req = df.groupby('reqDay').size()
num_req = num_req.reset_index()
num_req.rename(
columns={0: 'numReq'},
inplace=True
)
num_req['day'] = pd.to_datetime(num_req.reqDay, unit="s")
file_sizes = df[['Filename', 'Size']].copy()
file_sizes.drop_duplicates("Filename", inplace=True)
file_sizes.Size /= 1024**2
return (df, file_frequencies, all_day_file_size,
file_sizes, num_files, num_req)
@property
[docs] def days(self) -> List['pd.DataFrame']:
"""Returns a list of days' DataFrames.
:return: a list with days' DataFrames
:rtype: List[pd.DataFrame]
"""
return self._days
@property
[docs] def num_req_x_day(self):
return self._num_req_x_day
@num_req_x_day.setter
def num_req_x_day(self, value: int):
"""Set the number of requests per day.
:param value: number of requests
:type value: int
"""
assert isinstance(
value, int), "ERROR: num req x day needs an integer value"
self._num_req_x_day = value
@property
[docs] def num_days(self) -> int:
return self._num_days
@property
[docs] def tot_num_requests(self) -> int:
return self._num_days * self._num_req_x_day
@num_days.setter
def num_days(self, value: int):
"""Set the number of days to generate.
:param value: number of days
:type value: int
"""
assert isinstance(
value, int), "ERROR: num days needs an integer value"
self._num_days = value
@property
[docs] def dest_folder(self) -> PurePath:
return self._dest_folder
@dest_folder.setter
def dest_folder(self, dest_folder: 'PurePath' = Path("./dataset")):
"""Change the destination folder
:param dest_folder: path of destination folder, defaults to Path("./dataset")
:type dest_folder: PurPath, optional
:raises Exception: When folder is not a PurePath object
"""
if isinstance(dest_folder, Path):
self._dest_folder = dest_folder
else:
raise Exception(
"ERROR: destination folder is not a PurePath object...")
[docs] def clean(self):
"""Delete all day dataframes.
"""
del self._days[:]
[docs] def prepare(self, function_name: str, kwargs: dict,
max_buf_len: int = 1024) -> Generator[int, None, None]:
"""Prepare the dataset.
This method recall the function generators.
:param function_name: The function to use during the preparation
:type function_name: str
:param kwargs: arguments of generator function
:type kwargs: dict
:param max_buf_len: size of row buffer, defaults to 1024
:type max_buf_len: int, optional
:yield: status percentage of the preparation
:rtype: int
"""
if function_name not in dir(functions):
importlib.reload(functions)
cur_gen_obj = getattr(functions, function_name)(**kwargs)
cur_gen_obj.num_req_x_day = self._num_req_x_day
delta = datetime.timedelta(days=1)
cur_date = self._start_date
buffer = []
for n_day in range(self._num_days):
cur_day = Day(cur_date)
cur_gen_obj.day_idx = n_day
for n_req, (elm, percentage) in enumerate(
cur_gen_obj.gen_day_elements(self._num_req_x_day)
):
# Old method with low performaces
# cur_day.append(elm)
buffer.append(elm)
if len(buffer) == 100:
cur_day.bulk_append(buffer)
del buffer[:]
buffer = []
if percentage is None:
yield int(
float(
(n_day * self._num_req_x_day + n_req) /
self.tot_num_requests
) * 100.)
else:
yield int(
(percentage + (n_day * 100.)) /
(self._num_days * 100.) * 100.
)
else:
if len(buffer) != 0:
cur_day.bulk_append(buffer)
cur_day.reset_index()
self._days.append(
cur_day
)
cur_date = cur_date + delta
yield 100
[docs] def _open_dataset_file(self, filename: 'str') -> 'Day':
"""Open a single dataset day.
:return: the current day data
:rtype: Day
"""
cur_date = [
int(elm)
for elm in filename.as_posix().rsplit(
"_", 1)[1].split(
".", 1)[0].split(
"-")
]
cur_date = datetime.date(*cur_date)
df = pd.read_csv(Path(filename))
return Day(cur_date, df)
[docs] def open_data(self, folder: str):
"""Open dataset from a folder.
:param folder: the dataset folder
:type folder: str
"""
for file_ in Path(folder).resolve().glob("*.csv*"):
self._days.append(
self._open_dataset_file(file_)
)
[docs] def save(self):
"""Exports all days' DataFrames in dest_folder."""
if self._dest_folder.exists():
shutil.rmtree(self._dest_folder)
Path.mkdir(self._dest_folder, parents=True)
for idx, day in enumerate(self._days, 1):
day.save(self._dest_folder)
yield int(float(idx / len(self._days) * 100.))