metric / loaders.py
Elron's picture
Upload loaders.py with huggingface_hub
9564cbf
raw
history blame
3.91 kB
import os
from tempfile import TemporaryDirectory
from typing import Dict, Mapping, Optional, Sequence, Union
import pandas as pd
from datasets import load_dataset as hf_load_dataset
from tqdm import tqdm
from .operator import SourceOperator
from .stream import MultiStream, Stream
try:
import ibm_boto3
from ibm_botocore.client import ClientError
ibm_boto3_available = True
except ImportError:
ibm_boto3_available = False
class Loader(SourceOperator):
pass
class LoadHF(Loader):
path: str
name: Optional[str] = None
data_dir: Optional[str] = None
data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None
streaming: bool = True
cached = False
def process(self):
dataset = hf_load_dataset(
self.path, name=self.name, data_dir=self.data_dir, data_files=self.data_files, streaming=self.streaming
)
return MultiStream.from_iterables(dataset)
class LoadCSV(Loader):
files: Dict[str, str]
chunksize: int = 1000
def load_csv(self, file):
for chunk in pd.read_csv(file, chunksize=self.chunksize):
for index, row in chunk.iterrows():
yield row.to_dict()
def process(self):
return MultiStream(
{name: Stream(generator=self.load_csv, gen_kwargs={"file": file}) for name, file in self.files.items()}
)
class LoadFromIBMCloud(Loader):
endpoint_url_env: str
aws_access_key_id_env: str
aws_secret_access_key_env: str
bucket_name: str
data_dir: str
data_files: Sequence[str]
def _download_from_cos(self, cos, bucket_name, item_name, local_file):
print(f"Downloading {item_name} from {bucket_name} COS to {local_file}")
try:
response = cos.Object(bucket_name, item_name).get()
size = response["ContentLength"]
except Exception as e:
raise Exception(f"Unabled to access {item_name} in {bucket_name} in COS", e)
progress_bar = tqdm(total=size, unit="iB", unit_scale=True)
def upload_progress(chunk):
progress_bar.update(chunk)
try:
cos.Bucket(bucket_name).download_file(item_name, local_file, Callback=upload_progress)
print("\nDownload Successful")
except Exception as e:
raise Exception(f"Unabled to download {item_name} in {bucket_name}", e)
def prepare(self):
super().prepare()
self.endpoint_url = os.getenv(self.endpoint_url_env)
self.aws_access_key_id = os.getenv(self.aws_access_key_id_env)
self.aws_secret_access_key = os.getenv(self.aws_secret_access_key_env)
def verify(self):
super().verify()
assert (
ibm_boto3_available
), f"Please install ibm_boto3 in order to use the LoadFromIBMCloud loader (using `pip install ibm-cos-sdk`) "
assert self.endpoint_url is not None, f"Please set the {self.endpoint_url_env} environmental variable"
assert self.aws_access_key_id is not None, f"Please set {self.aws_access_key_id_env} environmental variable"
assert (
self.aws_secret_access_key is not None
), f"Please set {self.aws_secret_access_key_env} environmental variable"
def process(self):
cos = ibm_boto3.resource(
"s3",
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
endpoint_url=self.endpoint_url,
)
with TemporaryDirectory() as temp_directory:
for data_file in self.data_files:
self._download_from_cos(
cos, self.bucket_name, self.data_dir + "/" + data_file, temp_directory + "/" + data_file
)
dataset = hf_load_dataset(temp_directory, streaming=False)
return MultiStream.from_iterables(dataset)