import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM, AutoProcessor, AutoModelForSpeechSeq2Seq, AutoModelForTextToWaveform ) from diffusers import DiffusionPipeline import time import os from dotenv import load_dotenv from huggingface_hub import HfApi, HfFolder, Repository import gradio as gr load_dotenv() def prune_model(model, amount=0.5): from torch.nn.utils import prune for name, module in model.named_modules(): if isinstance(module, (torch.nn.Linear, torch.nn.Conv2d)): prune.l1_unstructured(module, name='weight', amount=amount) prune.remove(module, 'weight') return model def quantize_to_q1_with_min(tensor, min_value=-1): tensor = torch.sign(tensor) tensor[tensor < min_value] = min_value return tensor def quantize_model_to_q1_with_min(model, min_value=-1): for name, param in model.named_parameters(): if param.dtype in [torch.float32, torch.float16]: with torch.no_grad(): param.copy_(quantize_to_q1_with_min(param.data, min_value)) def disable_unnecessary_components(model): for name, module in model.named_modules(): if isinstance(module, torch.nn.Dropout): module.p = 0.0 elif isinstance(module, torch.nn.BatchNorm1d): module.eval() def ultra_max_compress(model): model = prune_model(model, amount=0.8) quantize_model_to_q1_with_min(model, min_value=-0.05) disable_unnecessary_components(model) with torch.no_grad(): for name, param in model.named_parameters(): if param.requires_grad: param.requires_grad = False param.data = torch.nn.functional.hardtanh(param.data, min_val=-1.0, max_val=1.0) param.data = param.data.half() try: model = torch.jit.script(model) except Exception: pass prune_model(model, amount=0.9) model.eval() for buffer_name, buffer in model.named_buffers(): if buffer.numel() == 0: model._buffers.pop(buffer_name) return model def optimize_model_resources(model): torch.set_grad_enabled(False) model.eval() for name, param in model.named_parameters(): param.requires_grad = False if param.dtype == torch.float32: param.data = param.data.half() if hasattr(model, 'config'): if hasattr(model.config, 'max_position_embeddings'): model.config.max_position_embeddings = min(model.config.max_position_embeddings, 512) if hasattr(model.config, 'hidden_size'): model.config.hidden_size = min(model.config.hidden_size, 768) model = torch.jit.optimize_for_inference(model) return model def generate_random_responses(model, tokenizer, prompt, num_responses=5, max_length=50): responses = [] for _ in range(num_responses): input_ids = tokenizer.encode(prompt, return_tensors="pt") output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50) response = tokenizer.decode(output[0], skip_special_tokens=True) responses.append(response) return responses def patched_distilbert_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, output_attentions=None, output_hidden_states=None, return_dict=None): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = DistilBertModel.forward(self, input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) if not return_dict: output_tuple = [] for v in [outputs.last_hidden_state, outputs.hidden_states, outputs.attentions]: if v is not None: output_tuple.append(v) return tuple(output_tuple) return outputs def patched_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.distilbert(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) hidden_state = outputs[0] pooled_output = self.pre_classifier(hidden_state[:, 0]) pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) if not return_dict: output = (logits,) + outputs[1:] return output return logits def patched_roberta_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None): return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.roberta(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) hidden_state = outputs[0] pooled_output = hidden_state[:, 0] pooled_output = self.dropout(pooled_output) logits = self.classifier(pooled_output) if not return_dict: output = (logits,) + outputs[1:] return output return logits def optimize_for_low_resources(model): model = ultra_max_compress(model) model = optimize_model_resources(model) model.config.max_position_embeddings = 256 model.config.hidden_size = 384 return model def optimize_for_very_low_resources(model): model = ultra_max_compress(model) model = optimize_model_resources(model) model.config.max_position_embeddings = 128 model.config.hidden_size = 256 return model def remove_unused_model_components(model): for name, param in model.named_parameters(): if param.numel() == 0: model._parameters.pop(name) return model def auto_train_model(model, train_data, epochs=3): optimizer = torch.optim.Adam(model.parameters(), lr=1e-5) model.train() for epoch in range(epochs): for batch in train_data: inputs, labels = batch optimizer.zero_grad() outputs = model(**inputs, labels=labels) loss = outputs.loss loss.backward() optimizer.step() return model def apply_extreme_filters(model): model = ultra_max_compress(model) model = optimize_model_resources(model) model.config.max_position_embeddings = 128 model.config.hidden_size = 256 model = torch.jit.optimize_for_inference(model) model = prune_model(model, amount=0.95) quantize_model_to_q1_with_min(model, min_value=-0.1) return model def reduce_latency(model, tokenizer, prompt, num_responses=5, max_length=50): responses = [] start_time = time.time() for _ in range(num_responses): input_ids = tokenizer.encode(prompt, return_tensors="pt") output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50) response = tokenizer.decode(output[0], skip_special_tokens=True) responses.append(response) end_time = time.time() latency = (end_time - start_time) / num_responses * 1000 return responses, latency def create_gpt_distill_model(): gpt_model = GPT2LMHeadModel.from_pretrained("gpt2") gpt_tokenizer = AutoTokenizer.from_pretrained("gpt2") return gpt_model, gpt_tokenizer def create_gemma_distill_model(): gemma_model = AutoModelForCausalLM.from_pretrained("google/gemma-2-9b") gemma_tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-9b") return gemma_model, gemma_tokenizer def measure_performance(model, tokenizer, sequence_length=20, num_tokens=100): inputs = tokenizer("A" * sequence_length, return_tensors="pt") start_time = time.time() for _ in range(num_tokens): model.generate(**inputs) end_time = time.time() latency = (end_time - start_time) / num_tokens * 1000 tokens_per_second = num_tokens / (end_time - start_time) return latency, tokens_per_second def apply_diffusion_pipeline(prompt): diffusion_pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell") images = diffusion_pipeline(prompt).images return images def generate_responses_with_diffusion(prompt, use_diffusion): if "imagina" in prompt.lower() or "imagine" in prompt.lower(): images = apply_diffusion_pipeline(prompt) return images return None def generate_summary_with_bart(prompt): tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn") model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn") inputs = tokenizer.encode(prompt, return_tensors="pt") summary_ids = model.generate(inputs, max_length=130, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True) summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) return summary def generate_responses_with_bart(prompt): if "resumir" in prompt.lower() or "resumime" in prompt.lower(): summary = generate_summary_with_bart(prompt) return summary return None def apply_whisper_pipeline(prompt): processor = AutoProcessor.from_pretrained("openai/whisper-small") model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-small") inputs = processor(prompt, return_tensors="pt") outputs = model.generate(**inputs) transcription = processor.batch_decode(outputs, skip_special_tokens=True) return transcription def generate_transcription_with_whisper(prompt): if "transcribe" in prompt.lower() or "transcribime" in prompt.lower(): transcription = apply_whisper_pipeline(prompt) return transcription return None def apply_translation_pipeline(prompt): tokenizer = AutoTokenizer.from_pretrained("google-t5/t5-base") model = AutoModelForSeq2SeqLM.from_pretrained("google-t5/t5-base") inputs = tokenizer.encode(prompt, return_tensors="pt") translated_ids = model.generate(inputs, max_length=50) translated_text = tokenizer.decode(translated_ids[0], skip_special_tokens=True) return translated_text def generate_translation_with_t5(prompt): if "traducir" in prompt.lower() or "traducime" in prompt.lower(): translation = apply_translation_pipeline(prompt) return translation return None def apply_musicgen_pipeline(prompt): tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-small") model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-small") inputs = tokenizer(prompt, return_tensors="pt") audio = model.generate(inputs) return audio def generate_music_with_musicgen(prompt): if "música" in prompt.lower() or "canción" in prompt.lower(): music = apply_musicgen_pipeline(prompt) return music return None def apply_musicgen_melody_pipeline(prompt): tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-melody") model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-melody") inputs = tokenizer(prompt, return_tensors="pt") audio = model.generate(inputs) return audio def generate_music_with_musicgen_melody(prompt): if "melodía" in prompt.lower() or "melodia" in prompt.lower(): music = apply_musicgen_melody_pipeline(prompt) return music return None def apply_stable_diffusion_pipeline(prompt): pipeline = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1") images = pipeline(prompt).images return images def generate_responses_with_stable_diffusion(prompt): if "imagen" in prompt.lower() or "image" in prompt.lower(): images = apply_stable_diffusion_pipeline(prompt) return images return None def unify_models(*models): combined_model = torch.nn.ModuleList(models) return combined_model def combined_filter(model): model = ultra_max_compress(model) model = optimize_model_resources(model) model.config.max_position_embeddings = 128 model.config.hidden_size = 256 model = torch.jit.optimize_for_inference(model) model = prune_model(model, amount=0.95) quantize_model_to_q1_with_min(model, min_value=-0.1) return model def apply_filters_and_unify(model): model = combined_filter(model) model = remove_unused_model_components(model) return model def upload_to_huggingface(model, repo_name): api = HfApi() try: api.create_repo(repo_id=repo_name, repo_type="model") except Exception: pass model.save_pretrained(repo_name) tokenizer.save_pretrained(repo_name) repo = Repository(repo_name) repo.push_to_hub() def apply_extreme_filters_and_upload(model, repo_name): model = apply_extreme_filters(model) upload_to_huggingface(model, repo_name) def start_gradio_interface(): def process_prompt(prompt): response = { "summary": generate_responses_with_bart(prompt), "transcription": generate_transcription_with_whisper(prompt), "translation": generate_translation_with_t5(prompt), "music": generate_music_with_musicgen(prompt), "melody_music": generate_music_with_musicgen_melody(prompt), "image": generate_responses_with_stable_diffusion(prompt), "diffusion": generate_responses_with_diffusion(prompt, True) } return response interface = gr.Interface( fn=process_prompt, inputs=gr.Textbox(label="Enter Prompt"), outputs=[gr.Textbox(label="Summary"), gr.Textbox(label="Transcription"), gr.Textbox(label="Translation"), gr.Audio(label="Music"), gr.Audio(label="Melody Music"), gr.Image(label="Image"), gr.Image(label="Diffusion")], title="Multi-Function AI Model", description="Generate summaries, transcriptions, translations, music, melodies, images, and diffusion responses." ) interface.launch() start_gradio_interface() model_infos = [ {"model_name": "gpt2", "class": GPT2LMHeadModel}, {"model_name": "google/gemma-2-9b", "class": AutoModelForCausalLM} ] for model_info in model_infos: model = model_info["class"].from_pretrained(model_info["model_name"]) tokenizer = AutoTokenizer.from_pretrained(model_info["model_name"]) optimized_model, responses, latency = optimize_model_with_all_optimizations(model, tokenizer, "Sample prompt for optimization.") print(f"Model: {model_info['model_name']}") print(f"Latency: {latency:.2f} ms") print(f"Sample Responses: {responses}") gpt_model, gpt_tokenizer = create_gpt_distill_model() gemma_model, gemma_tokenizer = create_gemma_distill_model() optimized_gpt_model, gpt_responses, gpt_latency = optimize_model_with_all_optimizations(gpt_model, gpt_tokenizer, "Sample prompt for GPT optimization.") optimized_gemma_model, gemma_responses, gemma_latency = optimize_model_with_all_optimizations(gemma_model, gemma_tokenizer, "Sample prompt for Gemma optimization.") combined_model = unify_models(optimized_gpt_model, optimized_gemma_model) optimized_gpt_model_1gb = optimize_for_1gb_ram(optimized_gpt_model) optimized_gemma_model_1gb = optimize_for_1gb_ram(optimized_gemma_model) optimized_gpt_model_low = optimize_for_very_low_resources(optimized_gpt_model) optimized_gemma_model_low = optimize_for_very_low_resources(optimized_gemma_model) optimized_gpt_model_cpu = optimize_for_old_cpu(optimized_gpt_model) optimized_gemma_model_cpu = optimize_for_old_cpu(optimized_gemma_model) optimized_gpt_model_gpu = optimize_for_old_gpu(optimized_gpt_model) optimized_gemma_model_gpu = optimize_for_old_gpu(optimized_gemma_model) print("Models optimized for various resource constraints.") diffusion_response = generate_responses_with_diffusion("Imagine a serene landscape", True) if diffusion_response: print("Diffusion response generated.") summary_response = generate_responses_with_bart("Resumir este texto para obtener un resumen efectivo.", True) if summary_response: print("Summary response generated.") transcription_response = generate_transcription_with_whisper("Transcribe this audio file.", True) if transcription_response: print("Transcription response generated.") translation_response = generate_translation_with_t5("Traducir este texto al inglés.", True) if translation_response: print("Translation response generated.") music_response = generate_music_with_musicgen("Música para una tarde tranquila.", True) if music_response: print("Music response generated.") melody_music_response = generate_music_with_musicgen_melody("Melodía para relajación.", True) if melody_music_response: print("Melody music response generated.") image_response = generate_responses_with_stable_diffusion("Imagen de un paisaje sereno.", True) if image_response: print("Image response generated.") upload_to_hf.rst.immbined_model, "Ffftdtd5dtft/my_model")