|
import PyPDF2 |
|
from openpyxl import load_workbook |
|
from pptx import Presentation |
|
import gradio as gr |
|
import io |
|
from huggingface_hub import InferenceClient |
|
import re |
|
import zipfile |
|
import xml.etree.ElementTree as ET |
|
|
|
|
|
CHUNK_SIZE = 32000 |
|
MAX_NEW_TOKENS = 4096 |
|
|
|
|
|
client = InferenceClient("mistralai/Mistral-Nemo-Instruct-2407") |
|
|
|
|
|
|
|
def xml2text(xml): |
|
"""Extracts text from XML data.""" |
|
text = u'' |
|
root = ET.fromstring(xml) |
|
for child in root.iter(): |
|
text += child.text + " " if child.text is not None else '' |
|
return text |
|
|
|
def clean_text(content): |
|
"""Cleans text content based on the 'clean' parameter.""" |
|
content = content.replace('\n', ' ') |
|
content = content.replace('\r', ' ') |
|
content = content.replace('\t', ' ') |
|
content = re.sub(r'\s+', ' ', content) |
|
return content |
|
|
|
|
|
def split_content(content, chunk_size=CHUNK_SIZE): |
|
"""Splits content into chunks of a specified size.""" |
|
chunks = [] |
|
for i in range(0, len(content), chunk_size): |
|
chunks.append(content[i:i + chunk_size]) |
|
return chunks |
|
|
|
|
|
|
|
def extract_text_from_docx(docx_data, clean=True): |
|
"""Extracts text from DOCX files.""" |
|
text = u'' |
|
zipf = zipfile.ZipFile(io.BytesIO(docx_data)) |
|
|
|
filelist = zipf.namelist() |
|
|
|
header_xmls = 'word/header[0-9]*.xml' |
|
for fname in filelist: |
|
if re.match(header_xmls, fname): |
|
text += xml2text(zipf.read(fname)) |
|
|
|
doc_xml = 'word/document.xml' |
|
text += xml2text(zipf.read(doc_xml)) |
|
|
|
footer_xmls = 'word/footer[0-9]*.xml' |
|
for fname in filelist: |
|
if re.match(footer_xmls, fname): |
|
text += xml2text(zipf.read(fname)) |
|
|
|
zipf.close() |
|
if clean: |
|
text = clean_text(text) |
|
return text, len(text) |
|
|
|
def extract_text_from_pptx(pptx_data, clean=True): |
|
"""Extracts text from PPT files.""" |
|
text = u'' |
|
zipf = zipfile.ZipFile(io.BytesIO(pptx_data)) |
|
|
|
filelist = zipf.namelist() |
|
|
|
|
|
notes_xmls = 'ppt/notesSlides/notesSlide[0-9]*.xml' |
|
for fname in filelist: |
|
if re.match(notes_xmls, fname): |
|
text += xml2text(zipf.read(fname)) |
|
|
|
|
|
slide_xmls = 'ppt/slides/slide[0-9]*.xml' |
|
for fname in filelist: |
|
if re.match(slide_xmls, fname): |
|
text += xml2text(zipf.read(fname)) |
|
|
|
zipf.close() |
|
if clean: |
|
text = clean_text(text) |
|
return text, len(text) |
|
|
|
def read_document(file, clean=True): |
|
"""Reads content from various document formats.""" |
|
file_path = file.name |
|
file_extension = file_path.split('.')[-1].lower() |
|
|
|
with open(file_path, "rb") as f: |
|
file_content = f.read() |
|
|
|
if file_extension == 'pdf': |
|
try: |
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_content)) |
|
content = '' |
|
for page in range(len(pdf_reader.pages)): |
|
content += pdf_reader.pages[page].extract_text() |
|
if clean: |
|
content = clean_text(content) |
|
return content, len(content) |
|
except Exception as e: |
|
return f"Error reading PDF: {e}", 0 |
|
|
|
elif file_extension == 'xlsx': |
|
try: |
|
wb = load_workbook(io.BytesIO(file_content)) |
|
content = '' |
|
for sheet in wb.worksheets: |
|
for row in sheet.rows: |
|
for cell in row: |
|
if cell.value is not None: |
|
content += str(cell.value) + ' ' |
|
if clean: |
|
content = clean_text(content) |
|
return content, len(content) |
|
except Exception as e: |
|
return f"Error reading XLSX: {e}", 0 |
|
|
|
elif file_extension == 'pptx': |
|
try: |
|
return extract_text_from_pptx(file_content, clean) |
|
except Exception as e: |
|
return f"Error reading PPTX: {e}", 0 |
|
|
|
elif file_extension == 'doc' or file_extension == 'docx': |
|
try: |
|
return extract_text_from_docx(file_content, clean) |
|
except Exception as e: |
|
return f"Error reading DOC/DOCX: {e}", 0 |
|
|
|
else: |
|
try: |
|
content = file_content.decode('utf-8') |
|
if clean: |
|
content = clean_text(content) |
|
return content, len(content) |
|
except Exception as e: |
|
return f"Error reading file: {e}", 0 |
|
|
|
|
|
|
|
|
|
def generate_mistral_response(message): |
|
"""Generates a response from the Mistral API.""" |
|
stream = client.text_generation( |
|
message, |
|
max_new_tokens=MAX_NEW_TOKENS, |
|
stream=True, |
|
details=True, |
|
return_full_text=False |
|
) |
|
output = "" |
|
for response in stream: |
|
if not response.token.text == "</s>": |
|
output += response.token.text |
|
yield output |
|
|
|
|
|
def chat_document(file, question, clean=True): |
|
"""Chats with a document using a single Mistral API call.""" |
|
content, length = read_document(file, clean) |
|
if length > CHUNK_SIZE: |
|
content = content[:CHUNK_SIZE] |
|
|
|
system_prompt = """ |
|
You are a helpful and informative assistant that can answer questions based on the content of documents. |
|
You will receive the content of a document and a question about it. |
|
Your task is to provide a concise and accurate answer to the question based solely on the provided document content. |
|
If the document does not contain enough information to answer the question, simply state that you cannot answer the question based on the provided information. |
|
""" |
|
|
|
message = f"""[INST] [SYSTEM] {system_prompt} |
|
Document Content: {content} |
|
Question: {question} |
|
Answer:""" |
|
|
|
yield from generate_mistral_response(message) |
|
|
|
|
|
def chat_document_v2(file, question, clean=True): |
|
"""Chats with a document using chunk-based Mistral API calls and summarizes the answers.""" |
|
content, length = read_document(file, clean) |
|
chunks = split_content(content) |
|
|
|
system_prompt = """ |
|
You are a helpful and informative assistant that can answer questions based on the content of documents. |
|
You will receive the content of a document and a question about it. |
|
Your task is to provide a concise and accurate answer to the question based solely on the provided document content. |
|
If the document does not contain enough information to answer the question, simply state that you cannot answer the question based on the provided information. |
|
""" |
|
|
|
all_answers = [] |
|
for chunk in chunks: |
|
message = f"""[INST] [SYSTEM] {system_prompt} |
|
Document Content: {chunk[:CHUNK_SIZE]} |
|
Question: {question} |
|
Answer:""" |
|
|
|
response = "" |
|
for stream_response in generate_mistral_response(message): |
|
response = stream_response |
|
all_answers.append(response) |
|
|
|
|
|
summary_prompt = """ |
|
You are a helpful and informative assistant that can summarize multiple answers related to the same question. |
|
You will receive a list of answers to a question, and your task is to generate a concise and comprehensive summary that incorporates the key information from all the answers. |
|
Avoid repeating information unnecessarily and focus on providing the most relevant and accurate summary based on the provided answers. |
|
|
|
Answers: |
|
""" |
|
|
|
all_answers_str = "\n".join(all_answers) |
|
summary_message = f"""[INST] [SYSTEM] {summary_prompt} |
|
{all_answers_str[:30000]} |
|
Summary:""" |
|
|
|
yield from generate_mistral_response(summary_message) |
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Tabs(): |
|
with gr.TabItem("Document Reader"): |
|
iface1 = gr.Interface( |
|
fn=read_document, |
|
inputs=[ |
|
gr.File(label="Upload a Document"), |
|
gr.Checkbox(label="Clean Text", value=True), |
|
], |
|
outputs=[ |
|
gr.Textbox(label="Document Content"), |
|
gr.Number(label="Document Length (characters)"), |
|
], |
|
title="Document Reader", |
|
description="Upload a document (PDF, XLSX, PPTX, TXT, CSV, DOC, DOCX and Code or text file) to read its content." |
|
) |
|
with gr.TabItem("Document Chat"): |
|
iface2 = gr.Interface( |
|
fn=chat_document, |
|
inputs=[ |
|
gr.File(label="Upload a Document"), |
|
gr.Textbox(label="Question"), |
|
gr.Checkbox(label="Clean and Compress Text", value=True), |
|
], |
|
outputs=gr.Markdown(label="Answer"), |
|
title="Document Chat", |
|
description="Upload a document and ask questions about its content." |
|
) |
|
with gr.TabItem("Document Chat V2"): |
|
iface3 = gr.Interface( |
|
fn=chat_document_v2, |
|
inputs=[ |
|
gr.File(label="Upload a Document"), |
|
gr.Textbox(label="Question"), |
|
gr.Checkbox(label="Clean Text", value=True), |
|
], |
|
outputs=gr.Markdown(label="Answer"), |
|
title="Document Chat V2", |
|
description="Upload a document and ask questions about its content (using chunk-based approach)." |
|
) |
|
|
|
demo.launch() |