שלום לכולם!
אני רוצה לשתף אתכם בפרויקט מחקר ופיתוח שעבדתי עליו בתקופה האחרונה (הקוד נבנה ע"י gemini-3-pro-preview), שמוכיח שהבלתי אפשרי - אפשרי.
הצלחתי לפתח מודם אקוסטי (Data-over-Sound) שמאפשר להעביר קבצים (טקסט, תמונות, מסמכים) לתוך מחשב מנותק רשת (Offline/Air-Gapped) באמצעות שיחה קולית פשוטה מטלפון כשר! (דרך מערכת טלפונית בימות המשיח וכד')
זה נשמע כמו מדע בדיוני או חזרה לשנות ה-80, אבל עם האלגוריתמים הנכונים, זה עובד בצורה מדהימה ב-2026.
למה זה טוב?
עד היום, כדי להכניס למחשב מנותק מאינטרנט קובץ הייתם צריכים דיסק-און-קי פיזי.
הפיתוח הזה מאפשר לקבל קבצים דחופים למחשב המנותק ע"י השמעת הקלטה דרך הטלפון (רמקול או כבל AUX).
וכמובן עוד הרבה אפשרוית לפיתוח הלאה שאין להם סוף.
️ מה יש מתחת למכסה המנוע? ("פרוטוקול טיטניום")
כדי להתגבר על האיכות הגרועה של קווי הטלפון (דחיסת GSM/G.729) ורעשי הרקע, בניתי פרוטוקול שידור סופר-קשוח שכולל:
- Triple Voting (הצבעת רוב): כל בייט נשלח 3 פעמים. אם רעש הורס אחד מהם, המחשב לוקח את הרוב הקובע.
- Reed-Solomon ECC: תיקון שגיאות מתמטי (כמו ב-QR Code) שמשחזר מידע שנמחק.
- Interleaving (ערבוב): פיזור הביטים כך שקטיעה רגעית בקו לא תמחק אות שלמה, אלא רק "שריטות" קטנות שקל לתקן.
- CRC32: בדיקת תקינות סופית - אם הקובץ לא שוחזר ב-100% דיוק, המערכת תודיע.
המטרה שלי בפרסום
אני מפרסם כאן את הקוד המלא (Python) כקוד פתוח.
הוכחתי את ההיתכנות הטכנית ברמה הגבוהה ביותר (DSP). המטרה שלי היא שמפתחים תותחים מכאן יקחו את הליבה הזו ויפתחו אותה למוצר נגיש, שירותי המרה, אפליקציות או כל רעיון אחר.
קישור לתוכנה המקומפלת להורדה
זה הקובץ של התוכנה המקומפל ניתן ליצור באמצעותו קובץ שמע ולעלות לימות המשיח ואז להשמיע את ההקלטה דרך הטלפון לתוכנה במחשב המנותק מרשת.
שימו לב זו גרסה ראשונית לצורך ניסוים בלבד! מומלץ לנסות בהתחלה להמיר קובץ טקסט בעל מילים בודדות בלבד!
הוראות שימוש (חובה לקרוא!)
הפיזיקה של הסאונד היא עדינה. כדי שזה יעבוד "אחד לאחד", הקפידו על הכללים הבאים:
-
ווליום (הכי חשוב!):
- אין להגביר את הטלפון למקסימום! זה יוצר עיוות (Distortion) שהורס את האות הדיגיטלי.
- יש לכוון לעוצמה של כ-60%-70%.
- בתוכנה יש "מד עוצמה" – כוונו כך שהפס יהיה ירוק. אם הוא אדום, זה לא יעבוד.
-
חיבור AUX (מומלץ):
- הדרך הכי נקייה היא לחבר כבל AUX בין הטלפון לכניסת המיקרופון (Line-In) במחשב.
- שים לב: במחשבים ניידים עם שקע בודד (Combo Jack), כבל AUX רגיל לא יעבוד כמיקרופון! צריך כרטיס קול USB פשוט (עולה כמה שקלים) או מפצל ייעודי.
-
שימוש ברמקול (אקוסטי):
- אפשרי בהחלט!
- יש להיות בחדר שקט יחסית.
- יש להצמיד את רמקול הטלפון למיקרופון של המחשב.
המדריך הטכני למפתח
כדי להריץ את זה, תצטרכו Python מותקן על המחשב.
1. התקנת הספריות הדרושות:
פתחו CMD והריצו:
pip install numpy scipy sounddevice soundfile reedsolo pyinstaller
2. הקוד המלא:
העתיקו את הקוד הבא ושמרו אותו בקובץ בשם main.py. הקוד כולל ממשק גרפי (GUI), מקודד (Encoder) ומפענח (Decoder).
import sys
import os
import struct
import threading
import base64
import time
import queue
import zlib
import datetime
import numpy as np
import sounddevice as sd
import soundfile as sf
from scipy.io import wavfile
from scipy import signal as sig
from reedsolo import RSCodec, ReedSolomonError
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
# === הגדרות מודם: "Titanium Edition" ===
SAMPLE_RATE = 8000
BAUD_RATE = 40 # מהירות יציבה
FREQ_MARK = 900 # תדרים אופטימליים לקו טלפון
FREQ_SPACE = 1700
RS_ECC_SYMBOLS = 20 # תיקון שגיאות בסיסי (השילוש עושה את רוב העבודה)
INTERLEAVE_STEP = 40 # פיזור רחב מאוד
# סנכרון
SYNC_DURATION = 0.8
t_sync = np.arange(int(SAMPLE_RATE * SYNC_DURATION)) / SAMPLE_RATE
SYNC_SIGNAL = sig.chirp(t_sync, f0=FREQ_MARK, f1=FREQ_SPACE, t1=SYNC_DURATION, method='linear')
# Magic Byte
MAGIC_BITS = "01000010" # 'B'
TEXT_EXTENSIONS = ['.txt', '.csv', '.json', '.xml', '.html', '.php', '.py', '.js', '.ini', '.log']
# === לוגים ===
def write_log(msg):
ts = datetime.datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] {msg}")
try:
with open("LOG_TITANIUM.txt", "a", encoding="utf-8") as f: f.write(f"[{ts}] {msg}\n")
except: pass
# === לוגיקה מתמטית ===
def interleave_bits(bits):
n = len(bits)
padding = (INTERLEAVE_STEP - (n % INTERLEAVE_STEP)) % INTERLEAVE_STEP
bits_padded = bits + [0] * padding
interleaved = [0] * len(bits_padded)
rows = len(bits_padded) // INTERLEAVE_STEP
for i in range(len(bits_padded)):
row = i // INTERLEAVE_STEP
col = i % INTERLEAVE_STEP
new_idx = col * rows + row
interleaved[new_idx] = bits_padded[i]
return interleaved, padding
def deinterleave_bits(bits, padding):
n = len(bits)
rows = n // INTERLEAVE_STEP
if rows == 0: return bits
deinterleaved = [0] * n
for i in range(n):
col = i // rows
row = i % rows
original_idx = row * INTERLEAVE_STEP + col
if original_idx < n:
deinterleaved[original_idx] = bits[i]
if padding > 0: return deinterleaved[:-padding]
return deinterleaved
def text_to_bits(data_bytes):
bits = []
for byte in data_bytes:
for i in range(7, -1, -1): bits.append((byte >> i) & 1)
return bits
def bits_to_bytes(bits):
chars = []
for b in range(0, len(bits), 8):
byte = bits[b:b+8]
if len(byte) < 8: break
chars.append(int(''.join(map(str, byte)), 2))
return bytes(chars)
def vote_byte(b1, b2, b3):
if b1 == b2 or b1 == b3: return b1
if b2 == b3: return b2
return b1 # ברירת מחדל
# === Encoder ===
def generate_signal(filename, file_content, output_wav):
try:
write_log(f"מקודד: {filename}")
ext = os.path.splitext(filename)[1].lower()
is_text = ext in TEXT_EXTENSIONS
fname_bytes = os.path.basename(filename).encode('utf-8')
if is_text:
content_bytes = file_content
type_flag = 1
else:
content_bytes = base64.b64encode(file_content)
type_flag = 0
if len(fname_bytes) > 200: fname_bytes = fname_bytes[:200]
# 1. אריזה + CRC
crc = zlib.crc32(content_bytes) & 0xffffffff
packet = struct.pack('B', len(fname_bytes)) + fname_bytes + struct.pack('B', type_flag) + content_bytes + struct.pack('>I', crc)
# 2. קידוד RS
rsc = RSCodec(RS_ECC_SYMBOLS)
encoded_data = rsc.encode(packet)
# 3. שילוש הנתונים (Triple Redundancy) על הכל!
# זה השדרוג הגדול. כל בייט הופך ל-3.
tripled_data = bytearray()
for b in encoded_data:
tripled_data.extend([b, b, b])
# 4. המרה לביטים
raw_bits = text_to_bits(tripled_data)
# 5. ערבוב (Interleaving)
# זה מפזר את השילוש, כך שרעש לא ימחק את כל ה-3 עותקים
shuffled_bits, padding = interleave_bits(raw_bits)
# 6. כותרת: [Magic] [Length] [Padding]
magic_bits = text_to_bits(bytes([0x42])) # 'B'
# האורך הוא של הביטים המעורבבים
header_info = struct.pack('>IB', len(shuffled_bits), padding)
header_bits = text_to_bits(header_info)
header_bits_tripled = header_bits * 3
final_stream_bits = magic_bits + header_bits_tripled + shuffled_bits
# 7. יצירת אודיו
samples_per_bit = int(SAMPLE_RATE / BAUD_RATE)
t_wake = np.arange(int(SAMPLE_RATE * 3.0)) / SAMPLE_RATE
wake = np.sin(2 * np.pi * FREQ_SPACE * t_wake)
t_bit = np.arange(samples_per_bit) / SAMPLE_RATE
phase = 0
waves = []
for bit in final_stream_bits:
freq = FREQ_MARK if bit == 1 else FREQ_SPACE
w = np.sin(2 * np.pi * freq * t_bit + phase)
waves.append(w)
phase += 2 * np.pi * freq * (samples_per_bit / SAMPLE_RATE)
phase %= 2 * np.pi
final_audio = np.concatenate([wake, SYNC_SIGNAL, np.concatenate(waves)])
final_audio = sig.lfilter([1.0, -0.8], [1.0], final_audio)
m = np.max(np.abs(final_audio))
final_audio = (final_audio / m * 32767).astype(np.int16)
wavfile.write(output_wav, SAMPLE_RATE, final_audio)
return True, "קובץ טיטניום נוצר! (Triple Payload)"
except Exception as e: return False, str(e)
# === Decoder ===
def decode_signal(input_wav, output_folder):
try:
with open("LOG_TITANIUM.txt", "w", encoding="utf-8") as f: f.write("=== LOG START ===\n")
rate, data = wavfile.read(input_wav)
if len(data.shape) > 1: data = data[:, 0]
sos = sig.butter(6, [600, 2000], btype='bandpass', fs=SAMPLE_RATE, output='sos')
clean = sig.sosfilt(sos, data.astype(float))
mx = np.max(np.abs(clean))
if mx < 0.01: return False, "שקט מדי"
clean /= mx
# 1. סנכרון
corr = sig.correlate(clean, SYNC_SIGNAL, mode='valid')
peak = np.argmax(np.abs(corr))
if np.abs(corr[peak]) < 3: return False, "לא נמצא אות סנכרון"
approx_start = peak + len(SYNC_SIGNAL)
relevant = clean[approx_start:]
spb = int(SAMPLE_RATE / BAUD_RATE)
num_bits = len(relevant) // spb
t = np.arange(spb) / SAMPLE_RATE
ref_m = np.exp(-2j * np.pi * FREQ_MARK * t)
ref_s = np.exp(-2j * np.pi * FREQ_SPACE * t)
bits_str = ""
for i in range(num_bits):
chunk = relevant[i*spb : (i+1)*spb]
pm = np.abs(np.dot(chunk, ref_m))**2
ps = np.abs(np.dot(chunk, ref_s))**2
bits_str += "1" if pm > ps else "0"
# 2. Magic Byte
magic_idx = bits_str.find(MAGIC_BITS)
if magic_idx == -1:
inv = bits_str.replace('0','x').replace('1','0').replace('x','1')
magic_idx = inv.find(MAGIC_BITS)
if magic_idx != -1:
bits_str = inv
write_log("היפוך פאזה זוהה")
else: return False, "Magic Byte לא נמצא"
data_start = magic_idx + 8
# 3. Header Voting
if len(bits_str) < data_start + 120: return False, "קצר מדי"
h_part = bits_str[data_start : data_start + 120]
h_chunks = [h_part[0:40], h_part[40:80], h_part[80:120]]
if h_chunks[0] == h_chunks[1] or h_chunks[0] == h_chunks[2]: final_h = h_chunks[0]
elif h_chunks[1] == h_chunks[2]: final_h = h_chunks[1]
else: final_h = h_chunks[0]
try:
h_bytes = bits_to_bytes([int(x) for x in final_h])
data_len, padding = struct.unpack('>IB', h_bytes)
write_log(f"אורך מעורבב: {data_len}, ריפוד: {padding}")
except: return False, "שגיאת כותרת"
if data_len > 10000000 or data_len == 0: return False, f"אורך לא הגיוני {data_len}"
# 4. חילוץ וסידור מחדש (De-Interleave)
payload_idx = data_start + 120
if len(bits_str) < payload_idx + data_len:
bits_str = bits_str.ljust(payload_idx + data_len, '0')
shuffled = [int(b) for b in bits_str[payload_idx : payload_idx + data_len]]
raw_bits = deinterleave_bits(shuffled, padding)
raw_bytes = bits_to_bytes(raw_bits)
# 5. הצבעת רוב על התוכן (Payload Voting) - החלק החדש!
# יש לנו כעת רצף בייטים שכל אחד מהם חוזר 3 פעמים
# [A, A, A, B, B, B, ...]
reconstructed_payload = bytearray()
fixed_count = 0
for i in range(0, len(raw_bytes), 3):
chunk = raw_bytes[i : i+3]
if len(chunk) < 3: break
b1, b2, b3 = chunk[0], chunk[1], chunk[2]
if b1 == b2 and b2 == b3:
# הסכמה מלאה
res = b1
else:
# נדרש תיקון
res = vote_byte(b1, b2, b3)
fixed_count += 1
reconstructed_payload.append(res)
write_log(f"תוקנו באמצעות Voting: {fixed_count} בייטים")
# 6. RS Decode (שכבה שנייה)
try:
rsc = RSCodec(RS_ECC_SYMBOLS)
decoded_packet = rsc.decode(bytes(reconstructed_payload))[0]
# 7. פירוק
ptr = 0
name_len = decoded_packet[ptr]; ptr+=1
filename = decoded_packet[ptr:ptr+name_len].decode('utf-8', errors='ignore'); ptr+=name_len
file_type = decoded_packet[ptr]; ptr+=1
content = decoded_packet[ptr:-4]
rec_crc = struct.unpack('>I', decoded_packet[-4:])[0]
calc_crc = zlib.crc32(content) & 0xffffffff
if calc_crc != rec_crc:
write_log(f"CRC שגוי: {rec_crc} != {calc_crc}")
# אם ה-RS עבר, אולי נשמור בכל זאת עם אזהרה?
# לא, נחמיר
return False, "שגיאת CRC (תוכן לא תואם)"
final_data = content
if file_type == 0:
try: final_data = base64.b64decode(content)
except: pass
path = os.path.join(output_folder, filename)
with open(path, 'wb') as f: f.write(final_data)
return True, f"הצלחה! {filename}"
except ReedSolomonError:
write_log("RS נכשל")
return False, "תיקון שגיאות סופי נכשל"
except Exception as e: return False, str(e)
# === GUI ===
class ModemApp:
def __init__(self, root):
self.root = root
self.root.title("Acoustic Modem - TITANIUM EDITION")
self.root.geometry("600x700")
self.audio_buffer = []
self.is_rec = False
tabs = ttk.Notebook(root)
t1 = ttk.Frame(tabs); tabs.add(t1, text="Encode")
t2 = ttk.Frame(tabs); tabs.add(t2, text="Decode")
tabs.pack(fill="both", expand=True)
tk.Button(t1, text="צור קובץ (Titanium - Full Triple)", command=self.do_enc, bg="#007bff", fg="white", font=("Arial", 14)).pack(pady=50)
self.lbl_enc = tk.Label(t1, text="", fg="green")
self.lbl_enc.pack()
self.canvas_vol = tk.Canvas(t2, width=400, height=30, bg="black")
self.canvas_vol.pack(pady=10)
self.vol_bar = self.canvas_vol.create_rectangle(0,0,0,30, fill="green")
self.dev_combo = ttk.Combobox(t2, values=self.get_devs(), width=50)
if self.get_devs(): self.dev_combo.current(0)
self.dev_combo.pack(pady=10)
self.btn_rec = tk.Button(t2, text="🔴 התחל הקלטה", command=self.toggle_rec, bg="#dc3545", fg="white", font=("Arial", 14))
self.btn_rec.pack(pady=20)
self.lbl_status = tk.Label(t2, text="מוכן", font=("Arial", 14, "bold"))
self.lbl_status.pack()
tk.Button(t2, text="טען קובץ WAV", command=self.load_file).pack(pady=20)
tk.Button(t2, text="פתח לוג", command=lambda: os.startfile("LOG_TITANIUM.txt") if os.path.exists("LOG_TITANIUM.txt") else None).pack()
def get_devs(self):
try: return [f"{i}: {d['name']}" for i,d in enumerate(sd.query_devices()) if d['max_input_channels']>0]
except: return []
def do_enc(self):
fn = filedialog.askopenfilename()
if not fn: return
sn = filedialog.asksaveasfilename(defaultextension=".wav", filetypes=[("WAV","*.wav")])
if not sn: return
try:
with open(fn,'rb') as f: c=f.read()
ok, msg = generate_signal(fn, c, sn)
if ok: messagebox.showinfo("Success", msg)
else: messagebox.showerror("Error", msg)
except Exception as e: messagebox.showerror("Error", str(e))
def toggle_rec(self):
if not self.is_rec:
self.is_rec = True
self.btn_rec.config(text="⏹️ סיים ופענח", bg="#333")
self.audio_buffer = []
self.lbl_status.config(text="מקליט...", fg="red")
threading.Thread(target=self.rec_thread).start()
else:
self.is_rec = False
self.btn_rec.config(text="⏳ מעבד...", state="disabled")
self.lbl_status.config(text="מפענח (Voting & RS)...", fg="blue")
def update_vol(self, indata):
peak = np.max(np.abs(indata)) / 32768.0
w = min(400, int(peak * 400))
c = "green" if w < 300 else "red"
self.canvas_vol.coords(self.vol_bar, 0, 0, w, 30)
self.canvas_vol.itemconfig(self.vol_bar, fill=c)
def rec_thread(self):
idx = int(self.dev_combo.get().split(":")[0]) if self.dev_combo.get() else None
def cb(indata, f, t, s):
if self.is_rec:
self.audio_buffer.append(indata.copy())
try: self.root.after(10, lambda: self.update_vol(indata))
except: pass
try:
with sd.InputStream(device=idx, samplerate=44100, channels=1, dtype='int16', callback=cb):
while self.is_rec: sd.sleep(100)
self.process_audio()
except Exception as e:
self.is_rec = False
messagebox.showerror("Error", str(e))
self.root.after(0, lambda: self.btn_rec.config(text="🔴 התחל הקלטה", bg="#dc3545", state="normal"))
def process_audio(self):
if not self.audio_buffer: return
full = np.concatenate(self.audio_buffer, axis=0)
sf.write("DEBUG_INPUT.wav", full, 44100)
num = int(len(full) * SAMPLE_RATE / 44100)
data = sig.resample(full, num).astype(np.int16)
tmp = "temp_dec.wav"
wavfile.write(tmp, SAMPLE_RATE, data)
dl = "Recovered_Files"
if not os.path.exists(dl): os.makedirs(dl)
ok, msg = decode_signal(tmp, dl)
self.root.after(0, lambda: self.finish(ok, msg))
def load_file(self):
fn = filedialog.askopenfilename()
if not fn: return
self.lbl_status.config(text="מעבד...", fg="blue")
dl = "Recovered_Files"
if not os.path.exists(dl): os.makedirs(dl)
ok, msg = decode_signal(fn, dl)
self.finish(ok, msg)
def finish(self, ok, msg):
self.is_rec = False
self.btn_rec.config(text="🔴 התחל הקלטה", bg="#dc3545", state="normal")
self.lbl_status.config(text="הסתיים", fg="black")
if ok: messagebox.showinfo("Success", msg)
else: messagebox.showerror("Failed", msg)
if __name__ == "__main__":
root = tk.Tk()
app = ModemApp(root)
root.mainloop()
3. יצירת קובץ EXE (קומפילציה):
כדי להפוך את זה לתוכנה ניידת ללא צורך בהתקנת פייתון הריצו את הפקודה הבאה בתקיה שבה נמצא הקוד למעלה:
pyinstaller --onefile --noconsole --name "AcousticModem" main.py
הקובץ המוכן יחכה לכם בתיקייה dist.
בהצלחה! 