import os
from typing import Dict, Optional
class AudioProcessor:
def __init__(self, project_id: str, pat: str):
self.client = JamAI(
project_id=project_id,
token=pat
)
def validate_audio(self, audio_path: str) -> bool:
"""Validate if file exists and has correct extension"""
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
valid_extensions = ['.mp3', '.wav']
file_ext = os.path.splitext(audio_path)[1].lower()
if file_ext not in valid_extensions:
raise ValueError(f"Unsupported file format. Use: {valid_extensions}")
return True
def process_audio(self, audio_path: str) -> Optional[Dict[str, str]]:
"""Process a single audio file"""
try:
# Validate audio
self.validate_audio(audio_path)
# Upload file
print("Uploading audio...")
file_response = self.client.file.upload_file(audio_path)
print(f"Upload successful: {file_response.uri}")
# Process in action table
print("Processing audio...")
response = self.client.table.add_table_rows(
table_type=t.TableType.ACTION,
request=t.RowAddRequest(
table_id="AudioProcessor",
data=[{"Audio": file_response.uri}],
stream=False,
),
)
# Extract and return results
results = {
"transcription": response.rows[0].columns["Transcription"].text,
"summary": response.rows[0].columns["Summary"].text
}
print("Processing complete!")
return results
except Exception as e:
print(f"Error processing audio: {str(e)}")
return None
# Initialize processor
processor = AudioProcessor(PROJECT_ID, PAT)
# Process single audio
result = processor.process_audio("path/to/audio.mp3")
if result:
print(f"Transcription: {result['transcription']}")
print(f"Summary: {result['summary']}")
def process_audio_batch(audio_folder: str):
processor = AudioProcessor(PROJECT_ID, PAT)
results = []
for filename in os.listdir(audio_folder):
if filename.lower().endswith(('.mp3', '.wav')):
audio_path = os.path.join(audio_folder, filename)
result = processor.process_audio(audio_path)
if result:
results.append({
"filename": filename,
**result
})
return results
# Usage
results = process_audio_batch("path/to/audio/folder")
for result in results:
print(f"File: {result['filename']}")
print(f"Transcription: {result['transcription']}")
print(f"Summary: {result['summary']}")
print("---")
import os
import argparse
from jamaibase import JamAI, protocol as p
from typing import Dict, Optional
class AudioProcessor:
def __init__(self, project_id: str, pat: str):
self.client = JamAI(
project_id=project_id,
token=pat
)
def validate_audio(self, audio_path: str) -> bool:
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
valid_extensions = ['.mp3', '.wav']
file_ext = os.path.splitext(audio_path)[1].lower()
if file_ext not in valid_extensions:
raise ValueError(f"Unsupported file format. Use: {valid_extensions}")
return True
def process_audio(self, audio_path: str) -> Optional[Dict[str, str]]:
try:
self.validate_audio(audio_path)
print(f"Processing audio: {audio_path}")
print("Uploading audio...")
file_response = self.client.file.upload_file(audio_path)
print(f"Upload successful!")
print("Extracting information...")
response = self.client.table.add_table_rows(
table_type=t.TableType.ACTION,
request=t.RowAddRequest(
table_id="AudioProcessor",
data=[{"Audio": file_response.uri}],
stream=False,
),
)
results = {
"transcription": response.rows[0].columns["Transcription"].text,
"summary": response.rows[0].columns["Summary"].text
}
return results
except Exception as e:
print(f"Error: {str(e)}")
return None
def process_folder(folder_path: str, processor: AudioProcessor) -> None:
"""Process all audio files in a folder"""
if not os.path.exists(folder_path):
print(f"Folder not found: {folder_path}")
return
results = []
for filename in os.listdir(folder_path):
if filename.lower().endswith(('.mp3', '.wav')):
audio_path = os.path.join(folder_path, filename)
result = processor.process_audio(audio_path)
if result:
results.append({
"filename": filename,
**result
})
# Print results in a formatted way
print("\nProcessing Results:")
print("-" * 50)
for result in results:
print(f"File: {result['filename']}")
print(f"Transcription: {result['transcription']}")
print(f"Summary: {result['summary']}")
print("-" * 50)
def main():
# Set up argument parser
parser = argparse.ArgumentParser(description='Process audio files using JamAIBase')
parser.add_argument('--project-id', required=True, help='Your JamAIBase project ID')
parser.add_argument('--pat', required=True, help='Your Personal Access Token')
parser.add_argument('--input', required=True, help='Path to audio file or folder')
args = parser.parse_args()
# Initialize processor
processor = AudioProcessor(args.project_id, args.pat)
# Process input
if os.path.isfile(args.input):
# Single file processing
result = processor.process_audio(args.input)
if result:
print("\nResults:")
print("-" * 50)
print(f"Transcription: {result['transcription']}")
print(f"Summary: {result['summary']}")
print("-" * 50)
else:
# Folder processing
process_folder(args.input, processor)
if __name__ == "__main__":
main()
Processing audio: discussion_recording.mp3
Uploading audio...
Upload successful!
Extracting information...
Results:
--------------------------------------------------
Transcription: ... [The full transcription of the audio] ...
Summary: This audio discusses the cost of AI model training. Stanford researchers...
--------------------------------------------------