Environment Configuration¶
!pip install torchcodec soundfile
import os
import torch
import torchaudio
import torch.nn.functional as F
import matplotlib.pyplot as plt
import IPython.display as ipd
import numpy as np
from google.colab import drive
drive.mount('/content/drive')
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SAMPLE_RATE = 16000
# You can replace "eight" with another word
TARGET_DIR = "/content/drive/MyDrive/MAT 201A | Zejun Huang/SpeechCommands/speech_commands_v0.02/eight"
all_files = [f for f in os.listdir(TARGET_DIR) if f.endswith('.wav')]
file_path = os.path.join(TARGET_DIR, all_files[0])
waveform, sr = torchaudio.load(file_path)
waveform = waveform.to(DEVICE)
if waveform.shape[1] < 16000:
waveform = F.pad(waveform, (0, 16000 - waveform.shape[1]))
else:
waveform = waveform[:, :16000]
Requirement already satisfied: torchcodec in /usr/local/lib/python3.12/dist-packages (0.9.0)
Requirement already satisfied: soundfile in /usr/local/lib/python3.12/dist-packages (0.13.1)
Requirement already satisfied: cffi>=1.0 in /usr/local/lib/python3.12/dist-packages (from soundfile) (2.0.0)
Requirement already satisfied: numpy in /usr/local/lib/python3.12/dist-packages (from soundfile) (2.0.2)
Requirement already satisfied: pycparser in /usr/local/lib/python3.12/dist-packages (from cffi>=1.0->soundfile) (2.23)
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Define Visualization Tools¶
plt.rcdefaults()
mel_transform = torchaudio.transforms.MelSpectrogram(
sample_rate=16000, n_mels=64, n_fft=1024, hop_length=256
).to(DEVICE)
def show_result(wav_tensor, title="Audio"):
wav_np = wav_tensor.cpu().squeeze().numpy()
duration = len(wav_np) / 16000
time_axis = np.linspace(0, duration, len(wav_np))
with torch.no_grad():
mel = mel_transform(wav_tensor)
mel = torch.clamp(mel, min=1e-5).log().cpu().squeeze().numpy()
fig = plt.figure(figsize=(8, 2.5))
plt.subplots_adjust(wspace=0.25, bottom=0.2)
ax1 = plt.subplot(1, 2, 1)
ax1.plot(time_axis, wav_np, linewidth=0.8, color='#1f77b4')
ax1.set_xlim(0, duration)
ax1.set_ylim(-1.1, 1.1)
ax1.set_ylabel("Amplitude", fontsize=9)
ax1.set_xlabel("Time (s)", fontsize=9)
ax1.spines['top'].set_visible(False)
ax1.spines['right'].set_visible(False)
ax1.grid(True, linestyle=':', alpha=0.4)
ax1.text(0.5, -0.3, f"(a) Waveform: {title}", transform=ax1.transAxes,
ha='center', va='top', fontsize=10, fontweight='normal')
ax2 = plt.subplot(1, 2, 2)
ax2.imshow(mel, aspect='auto', origin='lower', cmap='inferno', extent=[0, duration, 0, 64])
ax2.set_xlabel("Time (s)", fontsize=9)
ax2.set_ylabel("Frequency (Mel)", fontsize=9)
ax2.spines['top'].set_visible(False)
ax2.spines['right'].set_visible(False)
ax2.text(0.5, -0.3, f"(b) Spectrogram: {title}", transform=ax2.transAxes,
ha='center', va='top', fontsize=10, fontweight='normal')
plt.show()
print(f"Play: {title}\n")
ipd.display(ipd.Audio(wav_np, rate=16000))
Original¶
show_result(waveform, title="Original (Clean)")
Play: Original (Clean)
Packet Loss¶
Each frame is modeled as being in one of two possible states:
- State B (Bad) — the frame is lost
- State G (Good) — the frame is received normally
Then transition probability
- P(B to B)=q (If the current frame is lost, the next frame will continue to be lost with probability q.)
- P(B to G)=1-q
- P(G to B)=p (If the current frame is good, the next frame starts a loss event with probability p.)
- P(G to G)=1-p
Let the long-term (average) loss rate be: $\pi_{loss}$ (parameterp_loss), the balance equation of the Markov chain: $$ \pi_{loss} = \pi_{loss} q + (1 - \pi_{loss}) p $$ Solving:
$$p_{loss} = \frac{p}{p + 1 - q}$$
$$p(1 - p_{loss}) = p_{loss}(1 - q)$$
$$p = \frac{p_{loss}(1 - q)}{1 - p_{loss}}$$
This is exactly what code implements:
p = p_loss * (1 - q) / (1 - p_loss + 1e-8)
(The term 1e-8 is simply a small numerical safeguard to prevent division by zero.)
Once the system enters the lost state $\pi_{loss}$, the length of the burst follows a geometric distribution.
Expected number of consecutive lost frames:
Examples:
- $q = 0.3 \rightarrow$ expected $\approx 1.43$ frames ($\approx 28.6$ ms)
- $q = 0.7 \rightarrow$ expected $\approx 3.33$ frames ($\approx 66.7$ ms)
So $q$ directly controls how “sticky” the bad state is.
I simulate typical VoIP jitter using short bursts (q ≈ 0.2–0.5).
def apply_packet_loss(waveform, p_loss, burst_prob=0.3):
if p_loss <= 0: return waveform
B, L = waveform.shape
# The waveform is split into 20 ms frames, which mimic network packets.
frame_len = int(SAMPLE_RATE * 0.02)
n_frames = L // frame_len
# A two-state Markov process determines whether each frame is lost:
wav_framed = waveform[:, :n_frames*frame_len].view(B, n_frames, frame_len)
out = wav_framed.clone()
q = burst_prob
p = p_loss * (1 - q) / (1 - p_loss + 1e-8)
mask = torch.zeros(B, n_frames, dtype=torch.bool, device=waveform.device)
current_state = (torch.rand(B, device=waveform.device) < p_loss)
mask[:, 0] = current_state
p_tensor = torch.tensor(p, device=waveform.device)
q_tensor = torch.tensor(q, device=waveform.device)
for t in range(1, n_frames):
rand_vals = torch.rand(B, device=waveform.device)
threshold = torch.where(current_state, q_tensor, p_tensor)
current_state = rand_vals < threshold
mask[:, t] = current_state
if mask[:, 0].any():
out[mask[:, 0], 0, :] = 0
for t in range(1, n_frames):
m = mask[:, t].unsqueeze(1)
out[:, t, :] = torch.where(m, out[:, t-1, :], out[:, t, :])
out = out.view(B, -1)
if out.shape[1] < L:
out = F.pad(out, (0, L - out.shape[1]))
return out
out = apply_packet_loss(waveform, p_loss=0.3, burst_prob=0.3)
show_result(out, title="Packet Loss (30%)")
Play: Packet Loss (30%)
Bandwidth¶
Although resampling is not a direct mechanism for bandwidth reduction in real-world systems, its effect (weakening or losing high-frequency information) is highly consistent with the effective bandwidth reduction caused by low-bit-rate vocoders, narrowband microphones, and noise reduction algorithms. Therefore, it is a widely used proxy method in the literature.
1. Low Bitrate Modes of Codecs¶
For example, AMR-NB (telephony), G.729, and Opus-NB often compress audio by:
- Limiting the frequency band (e.g., 300–3400 Hz).
- Automatically skipping high-frequency encoding.
- This is not downsampling, but rather "encoding discard."
Nominally, the sample rate remains 8 kHz, but the high-frequency content suffers severe distortion or is completely missing.
2. VoIP/Zoom Automatic Bandwidth Reduction (Weak Network)¶
Under weak network conditions:
- Opus-VoIP (for example) may reduce the effective bandwidth to 4–6 kHz in low bitrate modes.
- High-frequency details become blurred, though the global sample rate is not lowered.
- The algorithm operates adaptively.
3. Physical Bandwidth Limitations of Microphones/Speakers¶
Low-end microphones often have a restricted actual bandwidth of only:
- 150 Hz – 5 kHz
- or 200 Hz – 4 kHz Regardless of whether the input source has a high sample rate, the hardware device itself is physically incapable of reproducing high frequencies.
4. DSP Denoising Algorithms Erasing High Frequencies (Very Common)¶
For example, in smartphones:
- Spectral Subtraction
- Wiener Filter
- AGC (Automatic Gain Control)
- Beamforming
These algorithms often treat high-frequency components as noise and filter them out directly, particularly in noisy environments.
- Sound becomes dull, muffled, and bandwidth-limited.
def apply_bandwidth(waveform, target_sr):
if target_sr >= SAMPLE_RATE: return waveform
down = torchaudio.transforms.Resample(SAMPLE_RATE, target_sr).to(waveform.device)
up = torchaudio.transforms.Resample(target_sr, SAMPLE_RATE).to(waveform.device)
wav_low = down(waveform) #The signal is downsampled from 16 kHz to a low target rate (e.g., 4 kHz).
wav_back = up(wav_low) #It is then upsampled back to 16 kHz
if wav_back.shape[1] < waveform.shape[1]:
wav_back = F.pad(wav_back, (0, waveform.shape[1] - wav_back.shape[1]))
else:
wav_back = wav_back[:, :waveform.shape[1]]
return wav_back
out = apply_bandwidth(waveform, target_sr=8000)
show_result(out, title="Low Bandwidth (8kHz)")
Play: Low Bandwidth (8kHz)
clipping¶
Simulate audio experiences hard truncation in amplifiers or ADCs when the input is too large.
- The top and bottom of the waveform are flattened
- Generate strong harmonic distortion
- Sounds' broken ',' exploded ',' hard '
def apply_clipping(waveform, threshold):
if threshold >= 1.0: return waveform
return torch.clamp(waveform, -threshold, threshold)
out = apply_clipping(waveform, threshold=0.2)
show_result(out, title="Distortion (Clip 0.2)")
Play: Distortion (Clip 0.2)
Reverb¶
Simulate room reverberation.
Generate a simple Room Impulse Response (RIR) and then add it to the audio using convolution.
In a real room, if you clap your hands, you'll hear:
Direct sound
Wall reflections (early reflections)
Scattered decaying reverberation tail
Mathematically:
Reverberation = Original signal * Room impulse response (convolution)
def apply_reverb(waveform, room_scale):
if room_scale <= 0.01: return waveform
max_rir_len = int(SAMPLE_RATE * 0.5)
rir_len = int(max_rir_len * room_scale) #The RIR length is determined by room_scale.
if rir_len < 16: return waveform
t = torch.linspace(0, 1, rir_len, device=waveform.device) #Generate a timeline of 0~1
decay_rate = 8.0 - (room_scale * 5.0)
decay = torch.exp(-t * decay_rate) #Constructing exponential decay (Reverb Tail)
noise = torch.randn(rir_len, device=waveform.device) #Using noise to simulate random reflection
rir = noise * decay
rir = rir / (torch.norm(rir, p=2) + 1e-8) #Normalization,to avoid the signal becoming too loud after convolution
wav_in = waveform.unsqueeze(1)
rir_in = rir.view(1, 1, -1)
wet_signal = F.conv1d(wav_in, rir_in, padding=rir_len-1) #Convolution produces reverberation
wet_signal = wet_signal[:, 0, :waveform.shape[1]]
wet_gain = room_scale * 0.6 #Dry/wet ratio
dry_gain = 1.0 - (room_scale * 0.2)
out = (dry_gain * waveform) + (wet_gain * wet_signal)
return torch.clamp(out, -1.0, 1.0)
out = apply_reverb(waveform, room_scale=0.5)
show_result(out, title="Reverberation (Scale 0.5)")
Play: Reverberation (Scale 0.5)
Compression¶
Why do telephones use µ-law?
Because the human ear is more sensitive to weak signals (low volume) and less sensitive to strong signals.
The core idea of µ-law:
- Small signals: Amplify
- Large signals: Compress
- Ultimately, this keeps the quantization error "soundingly uniform".
µ-law companding using the following function $$F(x) = \text{sgn}(x) \frac{\ln(1 + \mu |x|)}{\ln(1 + \mu)}$$
The compressed signal is decompressed using the inverse function. $$x = \text{sgn}(y) \frac{1}{\mu} \left( (1 + \mu)^{|y|} - 1 \right)$$
- The blue line (compression) above $y=x$ (for the positive part) indicates that the small signal has been amplified.
- The orange line (decompressed) below $y=x$ indicates that it has restored the amplified small signal.
def apply_mu_law_compression(waveform, quality):
if quality >= 1024 or quality <= 1: return waveform
mu = quality - 1
x = torch.clamp(waveform, -1.0, 1.0)
x_mu = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / np.log(1 + mu) #Nonlinear Companding
x_quant = torch.round((x_mu + 1) / 2 * mu)
x_quant = torch.clamp(x_quant, 0, mu)
x_recon = 2 * (x_quant / mu) - 1
out = torch.sign(x_recon) * (1 / mu) * ((1 + mu)**torch.abs(x_recon) - 1)
return out
out = apply_mu_law_compression(waveform, quality=16)
show_result(out, title="Compression (4-bit)")
Play: Compression (4-bit)