המלצה | 🚀 [פרויקט מהפכני] העברת קבצים למחשב ללא אינטרנט דרך שיחה קולית (טלפון כשר!) - הקוד המלא
-
@MAD כוונתי הייתה לתוכנה עם ממשק גרפי - לא שמע מעצבן.
אכן חשבתי על זה בזכות הרעיון עבור אנדרואיד שאחד מחברי הפורום יצר לפני מספר ימים (מחילה שאיני זוכר את שמו).לגבי השאלה כמה זמן ייקח לשלוח את המידע, בוא נחשב: בישראל יש בערך 30,00 תחנות אוטובוס. בנוסף מספר קו נע בין 1 ל-3 ספרות, עד 1000 אפשרויות לקו.
כלומר, הטלפון צריך לשלוח שאלה: בעוד כמה זמן יגיע קו מספר X לתחנה Y? (בהנחה שהתוכנה שומרת בזיכרון הפנימי של הטלפון את המיפוי של מספר-> שם תחנה.)
כדי לייצג מספר קו אנחנו צריכים 10 ביטים (לא בתים!) - 2^10 > 1000. באופן דומה מספיק 15 ביטים כדי לייצג מספר תחנה.
נניח שהתגובה של השרת היא 3 האוטובוסים הקרובים מהקו המבוקש. נניח חלון זמן של שעתיים הקרובות, מספיק 7 ביטים לכל אחד (2^7 = 128).
סכ"ה 46 ביטים. פחות מ-6 בתים. לשם ההשואה כל תו בעברית תופס שני בתים, כך ש-3 אותיות מההודעה הזו שוות מבחינת כמות המידע שעובר דרך הרשת.בלי להיכנס לאגוריתמי תיקון שגיאות וכיוצ"ב, נניח ששקט = 0, וצליל = 1. כעת צריך לברר איזה קצב של שקט/צליל אפשר להעביר בקו טלפון מודרני בלי להסתכן באיבוד מידע...
לרפרנס: קצב המידע במודם מלפני 30 שנה שחובר לקו טלפון נייח היה בין 300 ל-56,000 ביטים לשניה.
לא מופרך לקבל את המידע תוך פחות משניה (כולל 'לחיצת-ידיים', ECC ושאר ירקות).לסיכום: אם יצרן טלפונים כשרים רוצה להוסיף פונקציה חדשה שתיתן לו עדיפות ענקית על פני שאר השחקנים בשוק - זה רעיון ששוה לשחק בו.
-
@MAD כוונתי הייתה לתוכנה עם ממשק גרפי - לא שמע מעצבן.
אכן חשבתי על זה בזכות הרעיון עבור אנדרואיד שאחד מחברי הפורום יצר לפני מספר ימים (מחילה שאיני זוכר את שמו).לגבי השאלה כמה זמן ייקח לשלוח את המידע, בוא נחשב: בישראל יש בערך 30,00 תחנות אוטובוס. בנוסף מספר קו נע בין 1 ל-3 ספרות, עד 1000 אפשרויות לקו.
כלומר, הטלפון צריך לשלוח שאלה: בעוד כמה זמן יגיע קו מספר X לתחנה Y? (בהנחה שהתוכנה שומרת בזיכרון הפנימי של הטלפון את המיפוי של מספר-> שם תחנה.)
כדי לייצג מספר קו אנחנו צריכים 10 ביטים (לא בתים!) - 2^10 > 1000. באופן דומה מספיק 15 ביטים כדי לייצג מספר תחנה.
נניח שהתגובה של השרת היא 3 האוטובוסים הקרובים מהקו המבוקש. נניח חלון זמן של שעתיים הקרובות, מספיק 7 ביטים לכל אחד (2^7 = 128).
סכ"ה 46 ביטים. פחות מ-6 בתים. לשם ההשואה כל תו בעברית תופס שני בתים, כך ש-3 אותיות מההודעה הזו שוות מבחינת כמות המידע שעובר דרך הרשת.בלי להיכנס לאגוריתמי תיקון שגיאות וכיוצ"ב, נניח ששקט = 0, וצליל = 1. כעת צריך לברר איזה קצב של שקט/צליל אפשר להעביר בקו טלפון מודרני בלי להסתכן באיבוד מידע...
לרפרנס: קצב המידע במודם מלפני 30 שנה שחובר לקו טלפון נייח היה בין 300 ל-56,000 ביטים לשניה.
לא מופרך לקבל את המידע תוך פחות משניה (כולל 'לחיצת-ידיים', ECC ושאר ירקות).לסיכום: אם יצרן טלפונים כשרים רוצה להוסיף פונקציה חדשה שתיתן לו עדיפות ענקית על פני שאר השחקנים בשוק - זה רעיון ששוה לשחק בו.
@אורי חשבון יפה אבל לא מדוייק (לדגום יותר מ 40 פעמים בשנייה זה כבר מתחיל להיות מסוכן לשלמות הנתונים אז כולל סיבית יתירות crc וכו' זה יוצא יותר משניה אם כי עדיין בטווח הסביר)
בכל אופן קח בחשבון שיש קווים בשתי חברות עם אותו מספר (אבל רק אחד מהם עוצר בכל תחנה אז אולי אפשר לסנן לפי זה - תלוי במאגר שיהיה) -
כדי להפוך את הפרויקט הזה לשימושי באמת ביום-יום, אני חושב שכדאי לנו למקד את הדיון בשני כיוונים מרכזיים לשיפור:
התמקדות בדחיסת נתונים: להמשיך ולדחוף את יכולות הדחיסה לרמה הכי גבוהה שאפשר. ככל שהקובץ יהיה קטן יותר, ככה נצטרך פחות 'זמן אוויר' ופחות סיכוי לשגיאות.
מנגנון אישור (ACK) דרך מודל API: אם אנחנו רוצים לקחת את זה צעד קדימה ולחסוך את כל ה'ניפוי' (היתירות) שנדרש בגלל רעשים בשיחה קולית, הפתרון הוא יצירת אינטראקציה בין המחשב למערכת שזה אומר לפתח שלוחת API
הרעיון הוא בעצם לחסוך את שילוש הנתונים, וגם אם יש שגיאה אז מקסימום חוזרים על אותו בלוק בלבד ולא על כל הקובץוכנ"ל גם בקבלה
המערכת (IVR) תשלח את המידע בבלוקים.
המחשב שמקליט יבדוק כל בלוק בזמן אמת.
המחשב ידווח ל-API של המערכת אם הבלוק הגיע תקין או שגוי.
המערכת תמשיך לבלוק הבא רק אם התקבל אישור, ובמקרה של שגיאה – היא תשלח שוב רק את אותו בלוק ספציפי.מה דעתכם?
-
כדי להפוך את הפרויקט הזה לשימושי באמת ביום-יום, אני חושב שכדאי לנו למקד את הדיון בשני כיוונים מרכזיים לשיפור:
התמקדות בדחיסת נתונים: להמשיך ולדחוף את יכולות הדחיסה לרמה הכי גבוהה שאפשר. ככל שהקובץ יהיה קטן יותר, ככה נצטרך פחות 'זמן אוויר' ופחות סיכוי לשגיאות.
מנגנון אישור (ACK) דרך מודל API: אם אנחנו רוצים לקחת את זה צעד קדימה ולחסוך את כל ה'ניפוי' (היתירות) שנדרש בגלל רעשים בשיחה קולית, הפתרון הוא יצירת אינטראקציה בין המחשב למערכת שזה אומר לפתח שלוחת API
הרעיון הוא בעצם לחסוך את שילוש הנתונים, וגם אם יש שגיאה אז מקסימום חוזרים על אותו בלוק בלבד ולא על כל הקובץוכנ"ל גם בקבלה
המערכת (IVR) תשלח את המידע בבלוקים.
המחשב שמקליט יבדוק כל בלוק בזמן אמת.
המחשב ידווח ל-API של המערכת אם הבלוק הגיע תקין או שגוי.
המערכת תמשיך לבלוק הבא רק אם התקבל אישור, ובמקרה של שגיאה – היא תשלח שוב רק את אותו בלוק ספציפי.מה דעתכם?
@דוד-יצחק אני כתבתי על זה למעלה שיש אפשרות לעשות זאת ע״ שימוש עם מרכזיה שתבנה לבד עם אסטריסק, ואז את השולח בלוק של נתונים אחד אחרי השני והמחשב יענה עם DTMF ויאשר את הבלוקים ויבקש מחדש בלוקים שגויים כך אתה יכול לחסוך בשליחת הנתונים שוב ושוב ולהתמקד רק על הבלוקים הבעייתים
-
כדי להפוך את הפרויקט הזה לשימושי באמת ביום-יום, אני חושב שכדאי לנו למקד את הדיון בשני כיוונים מרכזיים לשיפור:
התמקדות בדחיסת נתונים: להמשיך ולדחוף את יכולות הדחיסה לרמה הכי גבוהה שאפשר. ככל שהקובץ יהיה קטן יותר, ככה נצטרך פחות 'זמן אוויר' ופחות סיכוי לשגיאות.
מנגנון אישור (ACK) דרך מודל API: אם אנחנו רוצים לקחת את זה צעד קדימה ולחסוך את כל ה'ניפוי' (היתירות) שנדרש בגלל רעשים בשיחה קולית, הפתרון הוא יצירת אינטראקציה בין המחשב למערכת שזה אומר לפתח שלוחת API
הרעיון הוא בעצם לחסוך את שילוש הנתונים, וגם אם יש שגיאה אז מקסימום חוזרים על אותו בלוק בלבד ולא על כל הקובץוכנ"ל גם בקבלה
המערכת (IVR) תשלח את המידע בבלוקים.
המחשב שמקליט יבדוק כל בלוק בזמן אמת.
המחשב ידווח ל-API של המערכת אם הבלוק הגיע תקין או שגוי.
המערכת תמשיך לבלוק הבא רק אם התקבל אישור, ובמקרה של שגיאה – היא תשלח שוב רק את אותו בלוק ספציפי.מה דעתכם?
@דוד-יצחק אחד הדברים שאפשר לעשות כדי לדחוס מעבר לדחיסה רגילה זה להוריד את השימוש ב-UTF-8 כי זה מכפיל כל ביט.
השאלה היא אם אפשר לשלוח מהתוכנה DTMF לימות המשיח שימשיך להקלטה הבאה אם זה עבר טוב או שישמיע שוב אם זה לא עבר טוב.
את התפריט לאחר כל קובץ אפשר להגדיר עם ההקדרה הבאה במודול השמעת קבצים כמו שמופיע כאן:ניתן להגדיר תפריט לאחר כל השמעה
after_play_tfr=tfr_more_optionsברירת מחדל של התפריט M1459 לשמיעה חוזרת הקישו 0 למעבר להשמעה הבאה הקישו 1
ניתן להגדיר את התפריט מחדש ראה כאןי
-
אולי יותר טוב או יכול לעזור
https://github.com/romanz/amodem?utm_source= -
@דוד-יצחק אחד הדברים שאפשר לעשות כדי לדחוס מעבר לדחיסה רגילה זה להוריד את השימוש ב-UTF-8 כי זה מכפיל כל ביט.
השאלה היא אם אפשר לשלוח מהתוכנה DTMF לימות המשיח שימשיך להקלטה הבאה אם זה עבר טוב או שישמיע שוב אם זה לא עבר טוב.
את התפריט לאחר כל קובץ אפשר להגדיר עם ההקדרה הבאה במודול השמעת קבצים כמו שמופיע כאן:ניתן להגדיר תפריט לאחר כל השמעה
after_play_tfr=tfr_more_optionsברירת מחדל של התפריט M1459 לשמיעה חוזרת הקישו 0 למעבר להשמעה הבאה הקישו 1
ניתן להגדיר את התפריט מחדש ראה כאןי
-
שלום לכולם!
אני רוצה לשתף אתכם בפרויקט מחקר ופיתוח שעבדתי עליו בתקופה האחרונה (הקוד נבנה ע"י gemini-3-pro-preview), שמוכיח שהבלתי אפשרי - אפשרי.
הצלחתי לפתח מודם אקוסטי (Data-over-Sound) שמאפשר להעביר קבצים (טקסט, תמונות, מסמכים) לתוך מחשב מנותק רשת (Offline/Air-Gapped) באמצעות שיחה קולית פשוטה מטלפון כשר! (דרך מערכת טלפונית בימות המשיח וכד')זה נשמע כמו מדע בדיוני או חזרה לשנות ה-80, אבל עם האלגוריתמים הנכונים, זה עובד בצורה מדהימה ב-2026.
למה זה טוב?עד היום, כדי להכניס למחשב מנותק מאינטרנט קובץ הייתם צריכים דיסק-און-קי פיזי.
הפיתוח הזה מאפשר לקבל קבצים דחופים למחשב המנותק ע"י השמעת הקלטה דרך הטלפון (רמקול או כבל AUX).
וכמובן עוד הרבה אפשרוית לפיתוח הלאה שאין להם סוף.
️ מה יש מתחת למכסה המנוע? ("פרוטוקול טיטניום")כדי להתגבר על האיכות הגרועה של קווי הטלפון (דחיסת GSM/G.729) ורעשי הרקע, בניתי פרוטוקול שידור סופר-קשוח שכולל:
- Triple Voting (הצבעת רוב): כל בייט נשלח 3 פעמים. אם רעש הורס אחד מהם, המחשב לוקח את הרוב הקובע.
- Reed-Solomon ECC: תיקון שגיאות מתמטי (כמו ב-QR Code) שמשחזר מידע שנמחק.
- Interleaving (ערבוב): פיזור הביטים כך שקטיעה רגעית בקו לא תמחק אות שלמה, אלא רק "שריטות" קטנות שקל לתקן.
- CRC32: בדיקת תקינות סופית - אם הקובץ לא שוחזר ב-100% דיוק, המערכת תודיע.
המטרה שלי בפרסוםאני מפרסם כאן את הקוד המלא (Python) כקוד פתוח.
הוכחתי את ההיתכנות הטכנית ברמה הגבוהה ביותר (DSP). המטרה שלי היא שמפתחים תותחים מכאן יקחו את הליבה הזו ויפתחו אותה למוצר נגיש, שירותי המרה, אפליקציות או כל רעיון אחר.קישור לתוכנה המקומפלת להורדה
זה הקובץ של התוכנה המקומפל ניתן ליצור באמצעותו קובץ שמע ולעלות לימות המשיח ואז להשמיע את ההקלטה דרך הטלפון לתוכנה במחשב המנותק מרשת.שימו לב זו גרסה ראשונית לצורך ניסוים בלבד! מומלץ לנסות בהתחלה להמיר קובץ טקסט בעל מילים בודדות בלבד!
הוראות שימוש (חובה לקרוא!)הפיזיקה של הסאונד היא עדינה. כדי שזה יעבוד "אחד לאחד", הקפידו על הכללים הבאים:
-
ווליום (הכי חשוב!):
- אין להגביר את הטלפון למקסימום! זה יוצר עיוות (Distortion) שהורס את האות הדיגיטלי.
- יש לכוון לעוצמה של כ-60%-70%.
- בתוכנה יש "מד עוצמה" – כוונו כך שהפס יהיה ירוק. אם הוא אדום, זה לא יעבוד.
-
חיבור AUX (מומלץ):
- הדרך הכי נקייה היא לחבר כבל AUX בין הטלפון לכניסת המיקרופון (Line-In) במחשב.
- שים לב: במחשבים ניידים עם שקע בודד (Combo Jack), כבל AUX רגיל לא יעבוד כמיקרופון! צריך כרטיס קול USB פשוט (עולה כמה שקלים) או מפצל ייעודי.
-
שימוש ברמקול (אקוסטי):
- אפשרי בהחלט!
- יש להיות בחדר שקט יחסית.
- יש להצמיד את רמקול הטלפון למיקרופון של המחשב.
המדריך הטכני למפתחכדי להריץ את זה, תצטרכו Python מותקן על המחשב.
1. התקנת הספריות הדרושות:
פתחו CMD והריצו:pip install numpy scipy sounddevice soundfile reedsolo pyinstaller2. הקוד המלא:
העתיקו את הקוד הבא ושמרו אותו בקובץ בשםmain.py. הקוד כולל ממשק גרפי (GUI), מקודד (Encoder) ומפענח (Decoder).import sys import os import struct import threading import base64 import time import queue import zlib import datetime import numpy as np import sounddevice as sd import soundfile as sf from scipy.io import wavfile from scipy import signal as sig from reedsolo import RSCodec, ReedSolomonError import tkinter as tk from tkinter import filedialog, messagebox, ttk # === הגדרות מודם: "Titanium Edition" === SAMPLE_RATE = 8000 BAUD_RATE = 40 # מהירות יציבה FREQ_MARK = 900 # תדרים אופטימליים לקו טלפון FREQ_SPACE = 1700 RS_ECC_SYMBOLS = 20 # תיקון שגיאות בסיסי (השילוש עושה את רוב העבודה) INTERLEAVE_STEP = 40 # פיזור רחב מאוד # סנכרון SYNC_DURATION = 0.8 t_sync = np.arange(int(SAMPLE_RATE * SYNC_DURATION)) / SAMPLE_RATE SYNC_SIGNAL = sig.chirp(t_sync, f0=FREQ_MARK, f1=FREQ_SPACE, t1=SYNC_DURATION, method='linear') # Magic Byte MAGIC_BITS = "01000010" # 'B' TEXT_EXTENSIONS = ['.txt', '.csv', '.json', '.xml', '.html', '.php', '.py', '.js', '.ini', '.log'] # === לוגים === def write_log(msg): ts = datetime.datetime.now().strftime("%H:%M:%S") print(f"[{ts}] {msg}") try: with open("LOG_TITANIUM.txt", "a", encoding="utf-8") as f: f.write(f"[{ts}] {msg}\n") except: pass # === לוגיקה מתמטית === def interleave_bits(bits): n = len(bits) padding = (INTERLEAVE_STEP - (n % INTERLEAVE_STEP)) % INTERLEAVE_STEP bits_padded = bits + [0] * padding interleaved = [0] * len(bits_padded) rows = len(bits_padded) // INTERLEAVE_STEP for i in range(len(bits_padded)): row = i // INTERLEAVE_STEP col = i % INTERLEAVE_STEP new_idx = col * rows + row interleaved[new_idx] = bits_padded[i] return interleaved, padding def deinterleave_bits(bits, padding): n = len(bits) rows = n // INTERLEAVE_STEP if rows == 0: return bits deinterleaved = [0] * n for i in range(n): col = i // rows row = i % rows original_idx = row * INTERLEAVE_STEP + col if original_idx < n: deinterleaved[original_idx] = bits[i] if padding > 0: return deinterleaved[:-padding] return deinterleaved def text_to_bits(data_bytes): bits = [] for byte in data_bytes: for i in range(7, -1, -1): bits.append((byte >> i) & 1) return bits def bits_to_bytes(bits): chars = [] for b in range(0, len(bits), 8): byte = bits[b:b+8] if len(byte) < 8: break chars.append(int(''.join(map(str, byte)), 2)) return bytes(chars) def vote_byte(b1, b2, b3): if b1 == b2 or b1 == b3: return b1 if b2 == b3: return b2 return b1 # ברירת מחדל # === Encoder === def generate_signal(filename, file_content, output_wav): try: write_log(f"מקודד: {filename}") ext = os.path.splitext(filename)[1].lower() is_text = ext in TEXT_EXTENSIONS fname_bytes = os.path.basename(filename).encode('utf-8') if is_text: content_bytes = file_content type_flag = 1 else: content_bytes = base64.b64encode(file_content) type_flag = 0 if len(fname_bytes) > 200: fname_bytes = fname_bytes[:200] # 1. אריזה + CRC crc = zlib.crc32(content_bytes) & 0xffffffff packet = struct.pack('B', len(fname_bytes)) + fname_bytes + struct.pack('B', type_flag) + content_bytes + struct.pack('>I', crc) # 2. קידוד RS rsc = RSCodec(RS_ECC_SYMBOLS) encoded_data = rsc.encode(packet) # 3. שילוש הנתונים (Triple Redundancy) על הכל! # זה השדרוג הגדול. כל בייט הופך ל-3. tripled_data = bytearray() for b in encoded_data: tripled_data.extend([b, b, b]) # 4. המרה לביטים raw_bits = text_to_bits(tripled_data) # 5. ערבוב (Interleaving) # זה מפזר את השילוש, כך שרעש לא ימחק את כל ה-3 עותקים shuffled_bits, padding = interleave_bits(raw_bits) # 6. כותרת: [Magic] [Length] [Padding] magic_bits = text_to_bits(bytes([0x42])) # 'B' # האורך הוא של הביטים המעורבבים header_info = struct.pack('>IB', len(shuffled_bits), padding) header_bits = text_to_bits(header_info) header_bits_tripled = header_bits * 3 final_stream_bits = magic_bits + header_bits_tripled + shuffled_bits # 7. יצירת אודיו samples_per_bit = int(SAMPLE_RATE / BAUD_RATE) t_wake = np.arange(int(SAMPLE_RATE * 3.0)) / SAMPLE_RATE wake = np.sin(2 * np.pi * FREQ_SPACE * t_wake) t_bit = np.arange(samples_per_bit) / SAMPLE_RATE phase = 0 waves = [] for bit in final_stream_bits: freq = FREQ_MARK if bit == 1 else FREQ_SPACE w = np.sin(2 * np.pi * freq * t_bit + phase) waves.append(w) phase += 2 * np.pi * freq * (samples_per_bit / SAMPLE_RATE) phase %= 2 * np.pi final_audio = np.concatenate([wake, SYNC_SIGNAL, np.concatenate(waves)]) final_audio = sig.lfilter([1.0, -0.8], [1.0], final_audio) m = np.max(np.abs(final_audio)) final_audio = (final_audio / m * 32767).astype(np.int16) wavfile.write(output_wav, SAMPLE_RATE, final_audio) return True, "קובץ טיטניום נוצר! (Triple Payload)" except Exception as e: return False, str(e) # === Decoder === def decode_signal(input_wav, output_folder): try: with open("LOG_TITANIUM.txt", "w", encoding="utf-8") as f: f.write("=== LOG START ===\n") rate, data = wavfile.read(input_wav) if len(data.shape) > 1: data = data[:, 0] sos = sig.butter(6, [600, 2000], btype='bandpass', fs=SAMPLE_RATE, output='sos') clean = sig.sosfilt(sos, data.astype(float)) mx = np.max(np.abs(clean)) if mx < 0.01: return False, "שקט מדי" clean /= mx # 1. סנכרון corr = sig.correlate(clean, SYNC_SIGNAL, mode='valid') peak = np.argmax(np.abs(corr)) if np.abs(corr[peak]) < 3: return False, "לא נמצא אות סנכרון" approx_start = peak + len(SYNC_SIGNAL) relevant = clean[approx_start:] spb = int(SAMPLE_RATE / BAUD_RATE) num_bits = len(relevant) // spb t = np.arange(spb) / SAMPLE_RATE ref_m = np.exp(-2j * np.pi * FREQ_MARK * t) ref_s = np.exp(-2j * np.pi * FREQ_SPACE * t) bits_str = "" for i in range(num_bits): chunk = relevant[i*spb : (i+1)*spb] pm = np.abs(np.dot(chunk, ref_m))**2 ps = np.abs(np.dot(chunk, ref_s))**2 bits_str += "1" if pm > ps else "0" # 2. Magic Byte magic_idx = bits_str.find(MAGIC_BITS) if magic_idx == -1: inv = bits_str.replace('0','x').replace('1','0').replace('x','1') magic_idx = inv.find(MAGIC_BITS) if magic_idx != -1: bits_str = inv write_log("היפוך פאזה זוהה") else: return False, "Magic Byte לא נמצא" data_start = magic_idx + 8 # 3. Header Voting if len(bits_str) < data_start + 120: return False, "קצר מדי" h_part = bits_str[data_start : data_start + 120] h_chunks = [h_part[0:40], h_part[40:80], h_part[80:120]] if h_chunks[0] == h_chunks[1] or h_chunks[0] == h_chunks[2]: final_h = h_chunks[0] elif h_chunks[1] == h_chunks[2]: final_h = h_chunks[1] else: final_h = h_chunks[0] try: h_bytes = bits_to_bytes([int(x) for x in final_h]) data_len, padding = struct.unpack('>IB', h_bytes) write_log(f"אורך מעורבב: {data_len}, ריפוד: {padding}") except: return False, "שגיאת כותרת" if data_len > 10000000 or data_len == 0: return False, f"אורך לא הגיוני {data_len}" # 4. חילוץ וסידור מחדש (De-Interleave) payload_idx = data_start + 120 if len(bits_str) < payload_idx + data_len: bits_str = bits_str.ljust(payload_idx + data_len, '0') shuffled = [int(b) for b in bits_str[payload_idx : payload_idx + data_len]] raw_bits = deinterleave_bits(shuffled, padding) raw_bytes = bits_to_bytes(raw_bits) # 5. הצבעת רוב על התוכן (Payload Voting) - החלק החדש! # יש לנו כעת רצף בייטים שכל אחד מהם חוזר 3 פעמים # [A, A, A, B, B, B, ...] reconstructed_payload = bytearray() fixed_count = 0 for i in range(0, len(raw_bytes), 3): chunk = raw_bytes[i : i+3] if len(chunk) < 3: break b1, b2, b3 = chunk[0], chunk[1], chunk[2] if b1 == b2 and b2 == b3: # הסכמה מלאה res = b1 else: # נדרש תיקון res = vote_byte(b1, b2, b3) fixed_count += 1 reconstructed_payload.append(res) write_log(f"תוקנו באמצעות Voting: {fixed_count} בייטים") # 6. RS Decode (שכבה שנייה) try: rsc = RSCodec(RS_ECC_SYMBOLS) decoded_packet = rsc.decode(bytes(reconstructed_payload))[0] # 7. פירוק ptr = 0 name_len = decoded_packet[ptr]; ptr+=1 filename = decoded_packet[ptr:ptr+name_len].decode('utf-8', errors='ignore'); ptr+=name_len file_type = decoded_packet[ptr]; ptr+=1 content = decoded_packet[ptr:-4] rec_crc = struct.unpack('>I', decoded_packet[-4:])[0] calc_crc = zlib.crc32(content) & 0xffffffff if calc_crc != rec_crc: write_log(f"CRC שגוי: {rec_crc} != {calc_crc}") # אם ה-RS עבר, אולי נשמור בכל זאת עם אזהרה? # לא, נחמיר return False, "שגיאת CRC (תוכן לא תואם)" final_data = content if file_type == 0: try: final_data = base64.b64decode(content) except: pass path = os.path.join(output_folder, filename) with open(path, 'wb') as f: f.write(final_data) return True, f"הצלחה! {filename}" except ReedSolomonError: write_log("RS נכשל") return False, "תיקון שגיאות סופי נכשל" except Exception as e: return False, str(e) # === GUI === class ModemApp: def __init__(self, root): self.root = root self.root.title("Acoustic Modem - TITANIUM EDITION") self.root.geometry("600x700") self.audio_buffer = [] self.is_rec = False tabs = ttk.Notebook(root) t1 = ttk.Frame(tabs); tabs.add(t1, text="Encode") t2 = ttk.Frame(tabs); tabs.add(t2, text="Decode") tabs.pack(fill="both", expand=True) tk.Button(t1, text="צור קובץ (Titanium - Full Triple)", command=self.do_enc, bg="#007bff", fg="white", font=("Arial", 14)).pack(pady=50) self.lbl_enc = tk.Label(t1, text="", fg="green") self.lbl_enc.pack() self.canvas_vol = tk.Canvas(t2, width=400, height=30, bg="black") self.canvas_vol.pack(pady=10) self.vol_bar = self.canvas_vol.create_rectangle(0,0,0,30, fill="green") self.dev_combo = ttk.Combobox(t2, values=self.get_devs(), width=50) if self.get_devs(): self.dev_combo.current(0) self.dev_combo.pack(pady=10) self.btn_rec = tk.Button(t2, text="🔴 התחל הקלטה", command=self.toggle_rec, bg="#dc3545", fg="white", font=("Arial", 14)) self.btn_rec.pack(pady=20) self.lbl_status = tk.Label(t2, text="מוכן", font=("Arial", 14, "bold")) self.lbl_status.pack() tk.Button(t2, text="טען קובץ WAV", command=self.load_file).pack(pady=20) tk.Button(t2, text="פתח לוג", command=lambda: os.startfile("LOG_TITANIUM.txt") if os.path.exists("LOG_TITANIUM.txt") else None).pack() def get_devs(self): try: return [f"{i}: {d['name']}" for i,d in enumerate(sd.query_devices()) if d['max_input_channels']>0] except: return [] def do_enc(self): fn = filedialog.askopenfilename() if not fn: return sn = filedialog.asksaveasfilename(defaultextension=".wav", filetypes=[("WAV","*.wav")]) if not sn: return try: with open(fn,'rb') as f: c=f.read() ok, msg = generate_signal(fn, c, sn) if ok: messagebox.showinfo("Success", msg) else: messagebox.showerror("Error", msg) except Exception as e: messagebox.showerror("Error", str(e)) def toggle_rec(self): if not self.is_rec: self.is_rec = True self.btn_rec.config(text="⏹️ סיים ופענח", bg="#333") self.audio_buffer = [] self.lbl_status.config(text="מקליט...", fg="red") threading.Thread(target=self.rec_thread).start() else: self.is_rec = False self.btn_rec.config(text="⏳ מעבד...", state="disabled") self.lbl_status.config(text="מפענח (Voting & RS)...", fg="blue") def update_vol(self, indata): peak = np.max(np.abs(indata)) / 32768.0 w = min(400, int(peak * 400)) c = "green" if w < 300 else "red" self.canvas_vol.coords(self.vol_bar, 0, 0, w, 30) self.canvas_vol.itemconfig(self.vol_bar, fill=c) def rec_thread(self): idx = int(self.dev_combo.get().split(":")[0]) if self.dev_combo.get() else None def cb(indata, f, t, s): if self.is_rec: self.audio_buffer.append(indata.copy()) try: self.root.after(10, lambda: self.update_vol(indata)) except: pass try: with sd.InputStream(device=idx, samplerate=44100, channels=1, dtype='int16', callback=cb): while self.is_rec: sd.sleep(100) self.process_audio() except Exception as e: self.is_rec = False messagebox.showerror("Error", str(e)) self.root.after(0, lambda: self.btn_rec.config(text="🔴 התחל הקלטה", bg="#dc3545", state="normal")) def process_audio(self): if not self.audio_buffer: return full = np.concatenate(self.audio_buffer, axis=0) sf.write("DEBUG_INPUT.wav", full, 44100) num = int(len(full) * SAMPLE_RATE / 44100) data = sig.resample(full, num).astype(np.int16) tmp = "temp_dec.wav" wavfile.write(tmp, SAMPLE_RATE, data) dl = "Recovered_Files" if not os.path.exists(dl): os.makedirs(dl) ok, msg = decode_signal(tmp, dl) self.root.after(0, lambda: self.finish(ok, msg)) def load_file(self): fn = filedialog.askopenfilename() if not fn: return self.lbl_status.config(text="מעבד...", fg="blue") dl = "Recovered_Files" if not os.path.exists(dl): os.makedirs(dl) ok, msg = decode_signal(fn, dl) self.finish(ok, msg) def finish(self, ok, msg): self.is_rec = False self.btn_rec.config(text="🔴 התחל הקלטה", bg="#dc3545", state="normal") self.lbl_status.config(text="הסתיים", fg="black") if ok: messagebox.showinfo("Success", msg) else: messagebox.showerror("Failed", msg) if __name__ == "__main__": root = tk.Tk() app = ModemApp(root) root.mainloop()3. יצירת קובץ EXE (קומפילציה):
כדי להפוך את זה לתוכנה ניידת ללא צורך בהתקנת פייתון הריצו את הפקודה הבאה בתקיה שבה נמצא הקוד למעלה:pyinstaller --onefile --noconsole --name "AcousticModem" main.pyהקובץ המוכן יחכה לכם בתיקייה
dist.
בהצלחה!

-
@דוד-יצחק אני כתבתי על זה למעלה שיש אפשרות לעשות זאת ע״ שימוש עם מרכזיה שתבנה לבד עם אסטריסק, ואז את השולח בלוק של נתונים אחד אחרי השני והמחשב יענה עם DTMF ויאשר את הבלוקים ויבקש מחדש בלוקים שגויים כך אתה יכול לחסוך בשליחת הנתונים שוב ושוב ולהתמקד רק על הבלוקים הבעייתים
@phpman כתב בהמלצה |
[פרויקט מהפכני] העברת קבצים למחשב ללא אינטרנט דרך שיחה קולית (טלפון כשר!) - הקוד המלא:@דוד-יצחק אני כתבתי על זה למעלה שיש אפשרות לעשות זאת ע״ שימוש עם מרכזיה שתבנה לבד עם אסטריסק, ואז את השולח בלוק של נתונים אחד אחרי השני והמחשב יענה עם DTMF ויאשר את הבלוקים ויבקש מחדש בלוקים שגויים כך אתה יכול לחסוך בשליחת הנתונים שוב ושוב ולהתמקד רק על הבלוקים הבעייתים
אוקיי אבל לא חייב ממש להקים מרכזיה אפשר גם עם שלוחת API אם כי יש לך פחות שליטה
-
בדקתי עוד כיון של הקטנת האורך של קובץ שמע וזה דווקא די הצליח
לקחתי קובץ המרתי אותו עם הסקריפט שיש בהודעה הראשונה והוא היה בעורך של 16 שעות
לקחתי את אותו קובץ דחסתי אותו בדחיסה מקסימלית עם 7ZIP (בהמלצת AI)והוא נהיה רק שעה וחצי -
שלום לכולם!
אני רוצה לשתף אתכם בפרויקט מחקר ופיתוח שעבדתי עליו בתקופה האחרונה (הקוד נבנה ע"י gemini-3-pro-preview), שמוכיח שהבלתי אפשרי - אפשרי.
הצלחתי לפתח מודם אקוסטי (Data-over-Sound) שמאפשר להעביר קבצים (טקסט, תמונות, מסמכים) לתוך מחשב מנותק רשת (Offline/Air-Gapped) באמצעות שיחה קולית פשוטה מטלפון כשר! (דרך מערכת טלפונית בימות המשיח וכד')זה נשמע כמו מדע בדיוני או חזרה לשנות ה-80, אבל עם האלגוריתמים הנכונים, זה עובד בצורה מדהימה ב-2026.
למה זה טוב?עד היום, כדי להכניס למחשב מנותק מאינטרנט קובץ הייתם צריכים דיסק-און-קי פיזי.
הפיתוח הזה מאפשר לקבל קבצים דחופים למחשב המנותק ע"י השמעת הקלטה דרך הטלפון (רמקול או כבל AUX).
וכמובן עוד הרבה אפשרוית לפיתוח הלאה שאין להם סוף.
️ מה יש מתחת למכסה המנוע? ("פרוטוקול טיטניום")כדי להתגבר על האיכות הגרועה של קווי הטלפון (דחיסת GSM/G.729) ורעשי הרקע, בניתי פרוטוקול שידור סופר-קשוח שכולל:
- Triple Voting (הצבעת רוב): כל בייט נשלח 3 פעמים. אם רעש הורס אחד מהם, המחשב לוקח את הרוב הקובע.
- Reed-Solomon ECC: תיקון שגיאות מתמטי (כמו ב-QR Code) שמשחזר מידע שנמחק.
- Interleaving (ערבוב): פיזור הביטים כך שקטיעה רגעית בקו לא תמחק אות שלמה, אלא רק "שריטות" קטנות שקל לתקן.
- CRC32: בדיקת תקינות סופית - אם הקובץ לא שוחזר ב-100% דיוק, המערכת תודיע.
המטרה שלי בפרסוםאני מפרסם כאן את הקוד המלא (Python) כקוד פתוח.
הוכחתי את ההיתכנות הטכנית ברמה הגבוהה ביותר (DSP). המטרה שלי היא שמפתחים תותחים מכאן יקחו את הליבה הזו ויפתחו אותה למוצר נגיש, שירותי המרה, אפליקציות או כל רעיון אחר.קישור לתוכנה המקומפלת להורדה
זה הקובץ של התוכנה המקומפל ניתן ליצור באמצעותו קובץ שמע ולעלות לימות המשיח ואז להשמיע את ההקלטה דרך הטלפון לתוכנה במחשב המנותק מרשת.שימו לב זו גרסה ראשונית לצורך ניסוים בלבד! מומלץ לנסות בהתחלה להמיר קובץ טקסט בעל מילים בודדות בלבד!
הוראות שימוש (חובה לקרוא!)הפיזיקה של הסאונד היא עדינה. כדי שזה יעבוד "אחד לאחד", הקפידו על הכללים הבאים:
-
ווליום (הכי חשוב!):
- אין להגביר את הטלפון למקסימום! זה יוצר עיוות (Distortion) שהורס את האות הדיגיטלי.
- יש לכוון לעוצמה של כ-60%-70%.
- בתוכנה יש "מד עוצמה" – כוונו כך שהפס יהיה ירוק. אם הוא אדום, זה לא יעבוד.
-
חיבור AUX (מומלץ):
- הדרך הכי נקייה היא לחבר כבל AUX בין הטלפון לכניסת המיקרופון (Line-In) במחשב.
- שים לב: במחשבים ניידים עם שקע בודד (Combo Jack), כבל AUX רגיל לא יעבוד כמיקרופון! צריך כרטיס קול USB פשוט (עולה כמה שקלים) או מפצל ייעודי.
-
שימוש ברמקול (אקוסטי):
- אפשרי בהחלט!
- יש להיות בחדר שקט יחסית.
- יש להצמיד את רמקול הטלפון למיקרופון של המחשב.
המדריך הטכני למפתחכדי להריץ את זה, תצטרכו Python מותקן על המחשב.
1. התקנת הספריות הדרושות:
פתחו CMD והריצו:pip install numpy scipy sounddevice soundfile reedsolo pyinstaller2. הקוד המלא:
העתיקו את הקוד הבא ושמרו אותו בקובץ בשםmain.py. הקוד כולל ממשק גרפי (GUI), מקודד (Encoder) ומפענח (Decoder).import sys import os import struct import threading import base64 import time import queue import zlib import datetime import numpy as np import sounddevice as sd import soundfile as sf from scipy.io import wavfile from scipy import signal as sig from reedsolo import RSCodec, ReedSolomonError import tkinter as tk from tkinter import filedialog, messagebox, ttk # === הגדרות מודם: "Titanium Edition" === SAMPLE_RATE = 8000 BAUD_RATE = 40 # מהירות יציבה FREQ_MARK = 900 # תדרים אופטימליים לקו טלפון FREQ_SPACE = 1700 RS_ECC_SYMBOLS = 20 # תיקון שגיאות בסיסי (השילוש עושה את רוב העבודה) INTERLEAVE_STEP = 40 # פיזור רחב מאוד # סנכרון SYNC_DURATION = 0.8 t_sync = np.arange(int(SAMPLE_RATE * SYNC_DURATION)) / SAMPLE_RATE SYNC_SIGNAL = sig.chirp(t_sync, f0=FREQ_MARK, f1=FREQ_SPACE, t1=SYNC_DURATION, method='linear') # Magic Byte MAGIC_BITS = "01000010" # 'B' TEXT_EXTENSIONS = ['.txt', '.csv', '.json', '.xml', '.html', '.php', '.py', '.js', '.ini', '.log'] # === לוגים === def write_log(msg): ts = datetime.datetime.now().strftime("%H:%M:%S") print(f"[{ts}] {msg}") try: with open("LOG_TITANIUM.txt", "a", encoding="utf-8") as f: f.write(f"[{ts}] {msg}\n") except: pass # === לוגיקה מתמטית === def interleave_bits(bits): n = len(bits) padding = (INTERLEAVE_STEP - (n % INTERLEAVE_STEP)) % INTERLEAVE_STEP bits_padded = bits + [0] * padding interleaved = [0] * len(bits_padded) rows = len(bits_padded) // INTERLEAVE_STEP for i in range(len(bits_padded)): row = i // INTERLEAVE_STEP col = i % INTERLEAVE_STEP new_idx = col * rows + row interleaved[new_idx] = bits_padded[i] return interleaved, padding def deinterleave_bits(bits, padding): n = len(bits) rows = n // INTERLEAVE_STEP if rows == 0: return bits deinterleaved = [0] * n for i in range(n): col = i // rows row = i % rows original_idx = row * INTERLEAVE_STEP + col if original_idx < n: deinterleaved[original_idx] = bits[i] if padding > 0: return deinterleaved[:-padding] return deinterleaved def text_to_bits(data_bytes): bits = [] for byte in data_bytes: for i in range(7, -1, -1): bits.append((byte >> i) & 1) return bits def bits_to_bytes(bits): chars = [] for b in range(0, len(bits), 8): byte = bits[b:b+8] if len(byte) < 8: break chars.append(int(''.join(map(str, byte)), 2)) return bytes(chars) def vote_byte(b1, b2, b3): if b1 == b2 or b1 == b3: return b1 if b2 == b3: return b2 return b1 # ברירת מחדל # === Encoder === def generate_signal(filename, file_content, output_wav): try: write_log(f"מקודד: {filename}") ext = os.path.splitext(filename)[1].lower() is_text = ext in TEXT_EXTENSIONS fname_bytes = os.path.basename(filename).encode('utf-8') if is_text: content_bytes = file_content type_flag = 1 else: content_bytes = base64.b64encode(file_content) type_flag = 0 if len(fname_bytes) > 200: fname_bytes = fname_bytes[:200] # 1. אריזה + CRC crc = zlib.crc32(content_bytes) & 0xffffffff packet = struct.pack('B', len(fname_bytes)) + fname_bytes + struct.pack('B', type_flag) + content_bytes + struct.pack('>I', crc) # 2. קידוד RS rsc = RSCodec(RS_ECC_SYMBOLS) encoded_data = rsc.encode(packet) # 3. שילוש הנתונים (Triple Redundancy) על הכל! # זה השדרוג הגדול. כל בייט הופך ל-3. tripled_data = bytearray() for b in encoded_data: tripled_data.extend([b, b, b]) # 4. המרה לביטים raw_bits = text_to_bits(tripled_data) # 5. ערבוב (Interleaving) # זה מפזר את השילוש, כך שרעש לא ימחק את כל ה-3 עותקים shuffled_bits, padding = interleave_bits(raw_bits) # 6. כותרת: [Magic] [Length] [Padding] magic_bits = text_to_bits(bytes([0x42])) # 'B' # האורך הוא של הביטים המעורבבים header_info = struct.pack('>IB', len(shuffled_bits), padding) header_bits = text_to_bits(header_info) header_bits_tripled = header_bits * 3 final_stream_bits = magic_bits + header_bits_tripled + shuffled_bits # 7. יצירת אודיו samples_per_bit = int(SAMPLE_RATE / BAUD_RATE) t_wake = np.arange(int(SAMPLE_RATE * 3.0)) / SAMPLE_RATE wake = np.sin(2 * np.pi * FREQ_SPACE * t_wake) t_bit = np.arange(samples_per_bit) / SAMPLE_RATE phase = 0 waves = [] for bit in final_stream_bits: freq = FREQ_MARK if bit == 1 else FREQ_SPACE w = np.sin(2 * np.pi * freq * t_bit + phase) waves.append(w) phase += 2 * np.pi * freq * (samples_per_bit / SAMPLE_RATE) phase %= 2 * np.pi final_audio = np.concatenate([wake, SYNC_SIGNAL, np.concatenate(waves)]) final_audio = sig.lfilter([1.0, -0.8], [1.0], final_audio) m = np.max(np.abs(final_audio)) final_audio = (final_audio / m * 32767).astype(np.int16) wavfile.write(output_wav, SAMPLE_RATE, final_audio) return True, "קובץ טיטניום נוצר! (Triple Payload)" except Exception as e: return False, str(e) # === Decoder === def decode_signal(input_wav, output_folder): try: with open("LOG_TITANIUM.txt", "w", encoding="utf-8") as f: f.write("=== LOG START ===\n") rate, data = wavfile.read(input_wav) if len(data.shape) > 1: data = data[:, 0] sos = sig.butter(6, [600, 2000], btype='bandpass', fs=SAMPLE_RATE, output='sos') clean = sig.sosfilt(sos, data.astype(float)) mx = np.max(np.abs(clean)) if mx < 0.01: return False, "שקט מדי" clean /= mx # 1. סנכרון corr = sig.correlate(clean, SYNC_SIGNAL, mode='valid') peak = np.argmax(np.abs(corr)) if np.abs(corr[peak]) < 3: return False, "לא נמצא אות סנכרון" approx_start = peak + len(SYNC_SIGNAL) relevant = clean[approx_start:] spb = int(SAMPLE_RATE / BAUD_RATE) num_bits = len(relevant) // spb t = np.arange(spb) / SAMPLE_RATE ref_m = np.exp(-2j * np.pi * FREQ_MARK * t) ref_s = np.exp(-2j * np.pi * FREQ_SPACE * t) bits_str = "" for i in range(num_bits): chunk = relevant[i*spb : (i+1)*spb] pm = np.abs(np.dot(chunk, ref_m))**2 ps = np.abs(np.dot(chunk, ref_s))**2 bits_str += "1" if pm > ps else "0" # 2. Magic Byte magic_idx = bits_str.find(MAGIC_BITS) if magic_idx == -1: inv = bits_str.replace('0','x').replace('1','0').replace('x','1') magic_idx = inv.find(MAGIC_BITS) if magic_idx != -1: bits_str = inv write_log("היפוך פאזה זוהה") else: return False, "Magic Byte לא נמצא" data_start = magic_idx + 8 # 3. Header Voting if len(bits_str) < data_start + 120: return False, "קצר מדי" h_part = bits_str[data_start : data_start + 120] h_chunks = [h_part[0:40], h_part[40:80], h_part[80:120]] if h_chunks[0] == h_chunks[1] or h_chunks[0] == h_chunks[2]: final_h = h_chunks[0] elif h_chunks[1] == h_chunks[2]: final_h = h_chunks[1] else: final_h = h_chunks[0] try: h_bytes = bits_to_bytes([int(x) for x in final_h]) data_len, padding = struct.unpack('>IB', h_bytes) write_log(f"אורך מעורבב: {data_len}, ריפוד: {padding}") except: return False, "שגיאת כותרת" if data_len > 10000000 or data_len == 0: return False, f"אורך לא הגיוני {data_len}" # 4. חילוץ וסידור מחדש (De-Interleave) payload_idx = data_start + 120 if len(bits_str) < payload_idx + data_len: bits_str = bits_str.ljust(payload_idx + data_len, '0') shuffled = [int(b) for b in bits_str[payload_idx : payload_idx + data_len]] raw_bits = deinterleave_bits(shuffled, padding) raw_bytes = bits_to_bytes(raw_bits) # 5. הצבעת רוב על התוכן (Payload Voting) - החלק החדש! # יש לנו כעת רצף בייטים שכל אחד מהם חוזר 3 פעמים # [A, A, A, B, B, B, ...] reconstructed_payload = bytearray() fixed_count = 0 for i in range(0, len(raw_bytes), 3): chunk = raw_bytes[i : i+3] if len(chunk) < 3: break b1, b2, b3 = chunk[0], chunk[1], chunk[2] if b1 == b2 and b2 == b3: # הסכמה מלאה res = b1 else: # נדרש תיקון res = vote_byte(b1, b2, b3) fixed_count += 1 reconstructed_payload.append(res) write_log(f"תוקנו באמצעות Voting: {fixed_count} בייטים") # 6. RS Decode (שכבה שנייה) try: rsc = RSCodec(RS_ECC_SYMBOLS) decoded_packet = rsc.decode(bytes(reconstructed_payload))[0] # 7. פירוק ptr = 0 name_len = decoded_packet[ptr]; ptr+=1 filename = decoded_packet[ptr:ptr+name_len].decode('utf-8', errors='ignore'); ptr+=name_len file_type = decoded_packet[ptr]; ptr+=1 content = decoded_packet[ptr:-4] rec_crc = struct.unpack('>I', decoded_packet[-4:])[0] calc_crc = zlib.crc32(content) & 0xffffffff if calc_crc != rec_crc: write_log(f"CRC שגוי: {rec_crc} != {calc_crc}") # אם ה-RS עבר, אולי נשמור בכל זאת עם אזהרה? # לא, נחמיר return False, "שגיאת CRC (תוכן לא תואם)" final_data = content if file_type == 0: try: final_data = base64.b64decode(content) except: pass path = os.path.join(output_folder, filename) with open(path, 'wb') as f: f.write(final_data) return True, f"הצלחה! {filename}" except ReedSolomonError: write_log("RS נכשל") return False, "תיקון שגיאות סופי נכשל" except Exception as e: return False, str(e) # === GUI === class ModemApp: def __init__(self, root): self.root = root self.root.title("Acoustic Modem - TITANIUM EDITION") self.root.geometry("600x700") self.audio_buffer = [] self.is_rec = False tabs = ttk.Notebook(root) t1 = ttk.Frame(tabs); tabs.add(t1, text="Encode") t2 = ttk.Frame(tabs); tabs.add(t2, text="Decode") tabs.pack(fill="both", expand=True) tk.Button(t1, text="צור קובץ (Titanium - Full Triple)", command=self.do_enc, bg="#007bff", fg="white", font=("Arial", 14)).pack(pady=50) self.lbl_enc = tk.Label(t1, text="", fg="green") self.lbl_enc.pack() self.canvas_vol = tk.Canvas(t2, width=400, height=30, bg="black") self.canvas_vol.pack(pady=10) self.vol_bar = self.canvas_vol.create_rectangle(0,0,0,30, fill="green") self.dev_combo = ttk.Combobox(t2, values=self.get_devs(), width=50) if self.get_devs(): self.dev_combo.current(0) self.dev_combo.pack(pady=10) self.btn_rec = tk.Button(t2, text="🔴 התחל הקלטה", command=self.toggle_rec, bg="#dc3545", fg="white", font=("Arial", 14)) self.btn_rec.pack(pady=20) self.lbl_status = tk.Label(t2, text="מוכן", font=("Arial", 14, "bold")) self.lbl_status.pack() tk.Button(t2, text="טען קובץ WAV", command=self.load_file).pack(pady=20) tk.Button(t2, text="פתח לוג", command=lambda: os.startfile("LOG_TITANIUM.txt") if os.path.exists("LOG_TITANIUM.txt") else None).pack() def get_devs(self): try: return [f"{i}: {d['name']}" for i,d in enumerate(sd.query_devices()) if d['max_input_channels']>0] except: return [] def do_enc(self): fn = filedialog.askopenfilename() if not fn: return sn = filedialog.asksaveasfilename(defaultextension=".wav", filetypes=[("WAV","*.wav")]) if not sn: return try: with open(fn,'rb') as f: c=f.read() ok, msg = generate_signal(fn, c, sn) if ok: messagebox.showinfo("Success", msg) else: messagebox.showerror("Error", msg) except Exception as e: messagebox.showerror("Error", str(e)) def toggle_rec(self): if not self.is_rec: self.is_rec = True self.btn_rec.config(text="⏹️ סיים ופענח", bg="#333") self.audio_buffer = [] self.lbl_status.config(text="מקליט...", fg="red") threading.Thread(target=self.rec_thread).start() else: self.is_rec = False self.btn_rec.config(text="⏳ מעבד...", state="disabled") self.lbl_status.config(text="מפענח (Voting & RS)...", fg="blue") def update_vol(self, indata): peak = np.max(np.abs(indata)) / 32768.0 w = min(400, int(peak * 400)) c = "green" if w < 300 else "red" self.canvas_vol.coords(self.vol_bar, 0, 0, w, 30) self.canvas_vol.itemconfig(self.vol_bar, fill=c) def rec_thread(self): idx = int(self.dev_combo.get().split(":")[0]) if self.dev_combo.get() else None def cb(indata, f, t, s): if self.is_rec: self.audio_buffer.append(indata.copy()) try: self.root.after(10, lambda: self.update_vol(indata)) except: pass try: with sd.InputStream(device=idx, samplerate=44100, channels=1, dtype='int16', callback=cb): while self.is_rec: sd.sleep(100) self.process_audio() except Exception as e: self.is_rec = False messagebox.showerror("Error", str(e)) self.root.after(0, lambda: self.btn_rec.config(text="🔴 התחל הקלטה", bg="#dc3545", state="normal")) def process_audio(self): if not self.audio_buffer: return full = np.concatenate(self.audio_buffer, axis=0) sf.write("DEBUG_INPUT.wav", full, 44100) num = int(len(full) * SAMPLE_RATE / 44100) data = sig.resample(full, num).astype(np.int16) tmp = "temp_dec.wav" wavfile.write(tmp, SAMPLE_RATE, data) dl = "Recovered_Files" if not os.path.exists(dl): os.makedirs(dl) ok, msg = decode_signal(tmp, dl) self.root.after(0, lambda: self.finish(ok, msg)) def load_file(self): fn = filedialog.askopenfilename() if not fn: return self.lbl_status.config(text="מעבד...", fg="blue") dl = "Recovered_Files" if not os.path.exists(dl): os.makedirs(dl) ok, msg = decode_signal(fn, dl) self.finish(ok, msg) def finish(self, ok, msg): self.is_rec = False self.btn_rec.config(text="🔴 התחל הקלטה", bg="#dc3545", state="normal") self.lbl_status.config(text="הסתיים", fg="black") if ok: messagebox.showinfo("Success", msg) else: messagebox.showerror("Failed", msg) if __name__ == "__main__": root = tk.Tk() app = ModemApp(root) root.mainloop()3. יצירת קובץ EXE (קומפילציה):
כדי להפוך את זה לתוכנה ניידת ללא צורך בהתקנת פייתון הריצו את הפקודה הבאה בתקיה שבה נמצא הקוד למעלה:pyinstaller --onefile --noconsole --name "AcousticModem" main.pyהקובץ המוכן יחכה לכם בתיקייה
dist.
בהצלחה!

מי שרוצה דוגמה חייה לאחת הגרסאות המתקדמות שלי זה הקישור:
https://drive.google.com/file/d/1WtkPXOtK0Dm0_yI*****Csmf7FbsBG8NLrkAb/view?usp=sharingלהוריד כוכביות ולא לפרסם ללא הכוכביות.
יש הקלטה מוכנה להשמעה בקו שמספרו: 0794946346.
יש להיכנס לתוכנה ללחוץ על "התחל האזנה" ואז לחייג לקו ולהיכנס לשלוחה 1 לעשות על רמקול לסתום אוזנים ואז תתחילו לראות את הטקסט שג'ימני יצר שהוא מסביר על התוכנה.
שימו לב שבקוד הזה הטקסט חולק למקטעים ולכן גם אם יתפספס קטע אחד אתם תראו את הקטע הבא אחרי כמה שניות.הקוד בספוילר:
import sys import os import struct import threading import zlib import base64 import time import queue import datetime import numpy as np import sounddevice as sd import soundfile as sf from scipy.io import wavfile from scipy import signal as sig from reedsolo import RSCodec, ReedSolomonError import tkinter as tk from tkinter import filedialog, messagebox, ttk, scrolledtext # === הגדרות ליבה: STREAMER V3 === SAMPLE_RATE = 8000 BAUD_RATE = 50 RS_ECC_SYMBOLS = 20 INTERLEAVE_STEP = 10 FREQS = [1000, 1400, 1800, 2200] # 4-FSK # סנכרון SYNC_DURATION = 0.4 t_sync = np.arange(int(SAMPLE_RATE * SYNC_DURATION)) / SAMPLE_RATE SYNC_SIGNAL = sig.chirp(t_sync, f0=1000, f1=2200, t1=SYNC_DURATION, method='linear') # גודל מנה (תווים) CHUNK_SIZE = 120 TEXT_EXTENSIONS = ['.txt', '.csv', '.json', '.xml', '.html', '.php', '.py', '.js', '.ini', '.log'] # === לוגיקה מתמטית (Modem Engine) === def interleave(bits): n = len(bits) pad = (INTERLEAVE_STEP - (n % INTERLEAVE_STEP)) % INTERLEAVE_STEP src = bits + [0]*pad dst = [0]*len(src) rows = len(src)//INTERLEAVE_STEP for i in range(len(src)): dst[(i%INTERLEAVE_STEP)*rows + (i//INTERLEAVE_STEP)] = src[i] return dst, pad def deinterleave(bits, pad): n = len(bits) rows = n//INTERLEAVE_STEP dst = [0]*n for i in range(n): col = i // rows row = i % rows dst[row*INTERLEAVE_STEP + col] = bits[i] return dst[:-pad] if pad else dst def text2bits(b): res = [] for x in b: for i in range(7,-1,-1): res.append((x>>i)&1) return res def bits2bytes(b): res = bytearray() for i in range(0, len(b), 8): byte = b[i:i+8] if len(byte)<8: break v = 0 for bit in byte: v = (v<<1)|bit res.append(v) return bytes(res) # === Encoder === def encode_packet(data_bytes, packet_id, total_packets): try: compressed = zlib.compress(data_bytes, level=9) header = struct.pack('>HHH', packet_id, total_packets, len(compressed)) payload = header + compressed rsc = RSCodec(RS_ECC_SYMBOLS) encoded = rsc.encode(payload) bits = text2bits(encoded) shuffled, pad = interleave(bits) meta_header = struct.pack('>BH', pad, len(shuffled)) meta_bits = text2bits(meta_header) final_bits = meta_bits + shuffled if len(final_bits) % 2 != 0: final_bits.append(0) sps = int(SAMPLE_RATE / BAUD_RATE) t_sym = np.arange(sps) / SAMPLE_RATE phase = 0 waves = [] for i in range(0, len(final_bits), 2): val = (final_bits[i] << 1) | final_bits[i+1] freq = FREQS[val] w = np.sin(2 * np.pi * freq * t_sym + phase) waves.append(w) phase += 2 * np.pi * freq * (sps / SAMPLE_RATE) phase %= 2 * np.pi silence = np.zeros(int(SAMPLE_RATE * 0.25)) return np.concatenate([SYNC_SIGNAL, np.concatenate(waves), silence]) except: return None def generate_stream_file(filename, file_content, output_wav): try: ext = os.path.splitext(filename)[1].lower() is_text = ext in TEXT_EXTENSIONS chunks = [] if is_text: try: text_str = file_content.decode('utf-8', errors='ignore') curr = "" words = text_str.replace('\n', ' \n ').split(' ') for word in words: if len(word.encode('utf-8')) > CHUNK_SIZE: if curr: chunks.append(curr.encode('utf-8')); curr = "" chunks.append(word.encode('utf-8')); continue if len(curr.encode('utf-8')) + len(word.encode('utf-8')) < CHUNK_SIZE: curr += word + " " else: chunks.append(curr.encode('utf-8')); curr = word + " " if curr: chunks.append(curr.encode('utf-8')) except: chunks = [file_content[i:i+CHUNK_SIZE] for i in range(0, len(file_content), CHUNK_SIZE)] else: chunks = [file_content[i:i+CHUNK_SIZE] for i in range(0, len(file_content), CHUNK_SIZE)] total = len(chunks) audio_parts = [] t_wake = np.arange(int(SAMPLE_RATE * 2.0)) / SAMPLE_RATE wake = np.sin(2 * np.pi * 2200 * t_wake) audio_parts.append(wake) for i, chunk in enumerate(chunks): pkt = encode_packet(chunk, i+1, total) if pkt is not None: audio_parts.append(pkt) final_float = np.concatenate(audio_parts) mx = np.max(np.abs(final_float)) final_int16 = (final_float / mx * 32767).astype(np.int16) if mx > 0 else final_float.astype(np.int16) wavfile.write(output_wav, SAMPLE_RATE, final_int16) return True, f"{total}" except Exception as e: return False, str(e) # === Decoder === def decode_packet_from_stream(clean_audio): try: sps = int(SAMPLE_RATE / BAUD_RATE) num_syms = len(clean_audio) // sps if num_syms < 50: return None, 0 t = np.arange(sps) / SAMPLE_RATE refs = [np.exp(-2j * np.pi * f * t) for f in FREQS] bits_str = "" for i in range(num_syms): chunk = clean_audio[i*sps : (i+1)*sps] energies = [np.abs(np.dot(chunk, r))**2 for r in refs] bits_str += str((np.argmax(energies)>>1)&1) + str(np.argmax(energies)&1) if len(bits_str) < 24: return None, 0 meta_bits = [int(b) for b in bits_str[:24]] pad, bit_len = struct.unpack('>BH', bits2bytes(meta_bits)) if bit_len > 10000 or bit_len == 0: return None, 0 if len(bits_str) < 24 + bit_len: return None, 0 payload_bits = [int(b) for b in bits_str[24 : 24+bit_len]] raw_bits = deinterleave(payload_bits, pad) packet_bytes = bits2bytes(raw_bits) decoded = RSCodec(RS_ECC_SYMBOLS).decode(packet_bytes)[0] pkt_id, pkt_total, content_len = struct.unpack('>HHH', decoded[:6]) content = zlib.decompress(decoded[6 : 6+content_len]) samples_consumed = ((24 + bit_len) // 2) * sps return (pkt_id, pkt_total, content), samples_consumed except: return None, 0 # === GUI Professional === class AcousticReader: def __init__(self, root): self.root = root self.root.title("Acoustic Reader Pro") self.root.geometry("600x700") self.root.configure(bg="#2C3E50") self.audio_buffer = np.array([], dtype=float) self.is_rec = False self.q = queue.Queue() self.received_packets = {} # Styles style = ttk.Style() style.theme_use('clam') style.configure("TNotebook", background="#2C3E50", borderwidth=0) style.configure("TNotebook.Tab", background="#34495E", foreground="white", padding=[20, 10], font=("Segoe UI", 11)) style.map("TNotebook.Tab", background=[("selected", "#3498DB")]) style.configure("TFrame", background="#ECF0F1") tabs = ttk.Notebook(root) t_decode = ttk.Frame(tabs) # קבלה ראשון t_encode = ttk.Frame(tabs) # יצירה שני tabs.add(t_decode, text=" 📥 קליטת נתונים (חיבור AUX) ") tabs.add(t_encode, text=" 📤 יצירת קובץ (למנהל) ") tabs.pack(fill="both", expand=True, padx=10, pady=10) self.setup_decoder(t_decode) self.setup_encoder(t_encode) self.root.after(100, self.process_queue) def setup_decoder(self, parent): frame = tk.Frame(parent, bg="#ECF0F1") frame.pack(fill="both", expand=True, padx=20, pady=20) # Header tk.Label(frame, text="מערכת לקליטת עלונים וקבצים", font=("Segoe UI", 18, "bold"), bg="#ECF0F1", fg="#2C3E50").pack(pady=10) # Status & Vol status_frame = tk.Frame(frame, bg="#BDC3C7", bd=1, relief="sunken") status_frame.pack(fill="x", pady=10) self.canvas_vol = tk.Canvas(status_frame, width=300, height=15, bg="#95a5a6", highlightthickness=0) self.canvas_vol.pack(side="left", padx=10, pady=5) self.vol_bar = self.canvas_vol.create_rectangle(0,0,0,15, fill="#2ECC71") self.lbl_status = tk.Label(status_frame, text="ממתין להפעלה...", font=("Segoe UI", 10), bg="#BDC3C7") self.lbl_status.pack(side="right", padx=10) # Main Button self.btn_rec = tk.Button(frame, text="התחל האזנה", command=self.toggle_rec, font=("Segoe UI", 16, "bold"), bg="#27AE60", fg="white", activebackground="#2ECC71", activeforeground="white", bd=0, padx=20, pady=10) self.btn_rec.pack(pady=10) # Text Area tk.Label(frame, text="תוכן הקובץ המתקבל:", font=("Segoe UI", 11), bg="#ECF0F1", anchor="w").pack(fill="x") self.txt_display = scrolledtext.ScrolledText(frame, height=15, font=("Segoe UI", 12), bd=2, relief="flat") self.txt_display.pack(fill="both", expand=True, pady=5) self.txt_display.tag_config("info", foreground="#E67E22", font=("Segoe UI", 10, "bold")) self.txt_display.tag_config("content", foreground="#2C3E50") # Footer buttons btn_frame = tk.Frame(frame, bg="#ECF0F1") btn_frame.pack(pady=5) tk.Button(btn_frame, text="נקה מסך", command=self.clear_screen, bg="#95a5a6", fg="white", bd=0, padx=10).pack(side="left", padx=5) tk.Button(btn_frame, text="העתק טקסט", command=self.copy_text, bg="#3498DB", fg="white", bd=0, padx=10).pack(side="left", padx=5) def setup_encoder(self, parent): frame = tk.Frame(parent, bg="#ECF0F1") frame.pack(fill="both", expand=True, padx=20, pady=20) tk.Label(frame, text="יצירת קובץ שמע למערכת הטלפונית", font=("Segoe UI", 16, "bold"), bg="#ECF0F1", fg="#2C3E50").pack(pady=20) tk.Label(frame, text="בחר קובץ טקסט/מסמך להמרה.\nהמערכת תיצור קובץ WAV מותאם לשידור.", font=("Segoe UI", 11), bg="#ECF0F1", fg="#7F8C8D").pack(pady=10) tk.Button(frame, text="בחר קובץ והמר", command=self.do_enc, font=("Segoe UI", 14), bg="#2980B9", fg="white", activebackground="#3498DB", activeforeground="white", bd=0, padx=20, pady=15).pack(pady=40) self.lbl_enc = tk.Label(frame, text="", font=("Segoe UI", 11, "bold"), bg="#ECF0F1", fg="#27AE60") self.lbl_enc.pack() def clear_screen(self): self.txt_display.delete('1.0', tk.END) self.received_packets = {} def copy_text(self): self.root.clipboard_clear() self.root.clipboard_append(self.txt_display.get("1.0", tk.END)) messagebox.showinfo("הועתק", "התוכן הועתק ללוח") def do_enc(self): fn = filedialog.askopenfilename() if not fn: return sn = filedialog.asksaveasfilename(defaultextension=".wav", filetypes=[("WAV","*.wav")]) if not sn: return self.lbl_enc.config(text="מעבד... אנא המתן", fg="blue") self.root.update() try: with open(fn,'rb') as f: c=f.read() ok, msg = generate_stream_file(fn, c, sn) if ok: self.lbl_enc.config(text=f"הצלחה! נוצר קובץ עם {msg} מנות.", fg="#27AE60") else: self.lbl_enc.config(text=f"שגיאה: {msg}", fg="#C0392B") except Exception as e: self.lbl_enc.config(text=f"שגיאה: {e}", fg="#C0392B") def toggle_rec(self): if not self.is_rec: self.is_rec = True self.btn_rec.config(text="⏹️ עצור הקשבה", bg="#C0392B", activebackground="#E74C3C") self.audio_buffer = np.array([], dtype=float) self.lbl_status.config(text="מקשיב לקו... (נא להשמיע)", fg="#2980B9") self.clear_screen() threading.Thread(target=self.rec_thread).start() else: self.is_rec = False self.btn_rec.config(text="התחל האזנה", bg="#27AE60", activebackground="#2ECC71") self.lbl_status.config(text="מוכן") def update_vol(self, indata): peak = np.max(np.abs(indata)) / 32768.0 w = min(300, int(peak * 300)) c = "#2ECC71" if w < 200 else "#E74C3C" self.canvas_vol.coords(self.vol_bar, 0, 0, w, 15) self.canvas_vol.itemconfig(self.vol_bar, fill=c) def rec_thread(self): def cb(i, f, t, s): self.q.put(i.copy()) # שימוש בברירת מחדל של המערכת (ללא device index) try: with sd.InputStream(samplerate=SAMPLE_RATE, channels=1, callback=cb): while self.is_rec: sd.sleep(100) except Exception as e: self.is_rec = False messagebox.showerror("Error", f"שגיאת מיקרופון: {e}") def process_queue(self): try: while not self.q.empty(): chunk = self.q.get_nowait().flatten() self.update_vol(chunk * 32767) self.audio_buffer = np.concatenate((self.audio_buffer, chunk)) if len(self.audio_buffer) > SAMPLE_RATE * 15: self.audio_buffer = self.audio_buffer[-SAMPLE_RATE*15:] self.try_decode() except: pass self.root.after(50, self.process_queue) def try_decode(self): if len(self.audio_buffer) < 4000: return buff = self.audio_buffer.copy() sos = sig.butter(4, [800, 2400], btype='bandpass', fs=SAMPLE_RATE, output='sos') clean = sig.sosfilt(sos, buff) mx = np.max(np.abs(clean)) if mx > 0.01: clean /= mx corr = sig.correlate(clean, SYNC_SIGNAL, mode='valid') peaks, _ = sig.find_peaks(np.abs(corr), height=5, distance=SAMPLE_RATE) for peak_idx in peaks: start_data = peak_idx + len(SYNC_SIGNAL) if len(clean) > start_data + 500: audio_chunk = clean[start_data:] result, consumed = decode_packet_from_stream(audio_chunk) if result: pkt_id, total, content = result if pkt_id not in self.received_packets: self.received_packets[pkt_id] = True try: text = content.decode('utf-8') self.append_text(f"--- חלק {pkt_id} מתוך {total} ---\n", "info") self.append_text(text + "\n", "content") except: self.append_text(f"--- חלק {pkt_id}/{total} (קובץ בינארי) ---\n", "info") cut_pos = start_data + consumed + 100 if cut_pos < len(self.audio_buffer): self.audio_buffer = self.audio_buffer[cut_pos:] else: self.audio_buffer = np.array([], dtype=float) break def append_text(self, text, tag): self.txt_display.insert(tk.END, text, tag) self.txt_display.see(tk.END) if __name__ == "__main__": root = tk.Tk() app = AcousticReader(root) root.mainloop() -
מי שרוצה דוגמה חייה לאחת הגרסאות המתקדמות שלי זה הקישור:
https://drive.google.com/file/d/1WtkPXOtK0Dm0_yI*****Csmf7FbsBG8NLrkAb/view?usp=sharingלהוריד כוכביות ולא לפרסם ללא הכוכביות.
יש הקלטה מוכנה להשמעה בקו שמספרו: 0794946346.
יש להיכנס לתוכנה ללחוץ על "התחל האזנה" ואז לחייג לקו ולהיכנס לשלוחה 1 לעשות על רמקול לסתום אוזנים ואז תתחילו לראות את הטקסט שג'ימני יצר שהוא מסביר על התוכנה.
שימו לב שבקוד הזה הטקסט חולק למקטעים ולכן גם אם יתפספס קטע אחד אתם תראו את הקטע הבא אחרי כמה שניות.הקוד בספוילר:
import sys import os import struct import threading import zlib import base64 import time import queue import datetime import numpy as np import sounddevice as sd import soundfile as sf from scipy.io import wavfile from scipy import signal as sig from reedsolo import RSCodec, ReedSolomonError import tkinter as tk from tkinter import filedialog, messagebox, ttk, scrolledtext # === הגדרות ליבה: STREAMER V3 === SAMPLE_RATE = 8000 BAUD_RATE = 50 RS_ECC_SYMBOLS = 20 INTERLEAVE_STEP = 10 FREQS = [1000, 1400, 1800, 2200] # 4-FSK # סנכרון SYNC_DURATION = 0.4 t_sync = np.arange(int(SAMPLE_RATE * SYNC_DURATION)) / SAMPLE_RATE SYNC_SIGNAL = sig.chirp(t_sync, f0=1000, f1=2200, t1=SYNC_DURATION, method='linear') # גודל מנה (תווים) CHUNK_SIZE = 120 TEXT_EXTENSIONS = ['.txt', '.csv', '.json', '.xml', '.html', '.php', '.py', '.js', '.ini', '.log'] # === לוגיקה מתמטית (Modem Engine) === def interleave(bits): n = len(bits) pad = (INTERLEAVE_STEP - (n % INTERLEAVE_STEP)) % INTERLEAVE_STEP src = bits + [0]*pad dst = [0]*len(src) rows = len(src)//INTERLEAVE_STEP for i in range(len(src)): dst[(i%INTERLEAVE_STEP)*rows + (i//INTERLEAVE_STEP)] = src[i] return dst, pad def deinterleave(bits, pad): n = len(bits) rows = n//INTERLEAVE_STEP dst = [0]*n for i in range(n): col = i // rows row = i % rows dst[row*INTERLEAVE_STEP + col] = bits[i] return dst[:-pad] if pad else dst def text2bits(b): res = [] for x in b: for i in range(7,-1,-1): res.append((x>>i)&1) return res def bits2bytes(b): res = bytearray() for i in range(0, len(b), 8): byte = b[i:i+8] if len(byte)<8: break v = 0 for bit in byte: v = (v<<1)|bit res.append(v) return bytes(res) # === Encoder === def encode_packet(data_bytes, packet_id, total_packets): try: compressed = zlib.compress(data_bytes, level=9) header = struct.pack('>HHH', packet_id, total_packets, len(compressed)) payload = header + compressed rsc = RSCodec(RS_ECC_SYMBOLS) encoded = rsc.encode(payload) bits = text2bits(encoded) shuffled, pad = interleave(bits) meta_header = struct.pack('>BH', pad, len(shuffled)) meta_bits = text2bits(meta_header) final_bits = meta_bits + shuffled if len(final_bits) % 2 != 0: final_bits.append(0) sps = int(SAMPLE_RATE / BAUD_RATE) t_sym = np.arange(sps) / SAMPLE_RATE phase = 0 waves = [] for i in range(0, len(final_bits), 2): val = (final_bits[i] << 1) | final_bits[i+1] freq = FREQS[val] w = np.sin(2 * np.pi * freq * t_sym + phase) waves.append(w) phase += 2 * np.pi * freq * (sps / SAMPLE_RATE) phase %= 2 * np.pi silence = np.zeros(int(SAMPLE_RATE * 0.25)) return np.concatenate([SYNC_SIGNAL, np.concatenate(waves), silence]) except: return None def generate_stream_file(filename, file_content, output_wav): try: ext = os.path.splitext(filename)[1].lower() is_text = ext in TEXT_EXTENSIONS chunks = [] if is_text: try: text_str = file_content.decode('utf-8', errors='ignore') curr = "" words = text_str.replace('\n', ' \n ').split(' ') for word in words: if len(word.encode('utf-8')) > CHUNK_SIZE: if curr: chunks.append(curr.encode('utf-8')); curr = "" chunks.append(word.encode('utf-8')); continue if len(curr.encode('utf-8')) + len(word.encode('utf-8')) < CHUNK_SIZE: curr += word + " " else: chunks.append(curr.encode('utf-8')); curr = word + " " if curr: chunks.append(curr.encode('utf-8')) except: chunks = [file_content[i:i+CHUNK_SIZE] for i in range(0, len(file_content), CHUNK_SIZE)] else: chunks = [file_content[i:i+CHUNK_SIZE] for i in range(0, len(file_content), CHUNK_SIZE)] total = len(chunks) audio_parts = [] t_wake = np.arange(int(SAMPLE_RATE * 2.0)) / SAMPLE_RATE wake = np.sin(2 * np.pi * 2200 * t_wake) audio_parts.append(wake) for i, chunk in enumerate(chunks): pkt = encode_packet(chunk, i+1, total) if pkt is not None: audio_parts.append(pkt) final_float = np.concatenate(audio_parts) mx = np.max(np.abs(final_float)) final_int16 = (final_float / mx * 32767).astype(np.int16) if mx > 0 else final_float.astype(np.int16) wavfile.write(output_wav, SAMPLE_RATE, final_int16) return True, f"{total}" except Exception as e: return False, str(e) # === Decoder === def decode_packet_from_stream(clean_audio): try: sps = int(SAMPLE_RATE / BAUD_RATE) num_syms = len(clean_audio) // sps if num_syms < 50: return None, 0 t = np.arange(sps) / SAMPLE_RATE refs = [np.exp(-2j * np.pi * f * t) for f in FREQS] bits_str = "" for i in range(num_syms): chunk = clean_audio[i*sps : (i+1)*sps] energies = [np.abs(np.dot(chunk, r))**2 for r in refs] bits_str += str((np.argmax(energies)>>1)&1) + str(np.argmax(energies)&1) if len(bits_str) < 24: return None, 0 meta_bits = [int(b) for b in bits_str[:24]] pad, bit_len = struct.unpack('>BH', bits2bytes(meta_bits)) if bit_len > 10000 or bit_len == 0: return None, 0 if len(bits_str) < 24 + bit_len: return None, 0 payload_bits = [int(b) for b in bits_str[24 : 24+bit_len]] raw_bits = deinterleave(payload_bits, pad) packet_bytes = bits2bytes(raw_bits) decoded = RSCodec(RS_ECC_SYMBOLS).decode(packet_bytes)[0] pkt_id, pkt_total, content_len = struct.unpack('>HHH', decoded[:6]) content = zlib.decompress(decoded[6 : 6+content_len]) samples_consumed = ((24 + bit_len) // 2) * sps return (pkt_id, pkt_total, content), samples_consumed except: return None, 0 # === GUI Professional === class AcousticReader: def __init__(self, root): self.root = root self.root.title("Acoustic Reader Pro") self.root.geometry("600x700") self.root.configure(bg="#2C3E50") self.audio_buffer = np.array([], dtype=float) self.is_rec = False self.q = queue.Queue() self.received_packets = {} # Styles style = ttk.Style() style.theme_use('clam') style.configure("TNotebook", background="#2C3E50", borderwidth=0) style.configure("TNotebook.Tab", background="#34495E", foreground="white", padding=[20, 10], font=("Segoe UI", 11)) style.map("TNotebook.Tab", background=[("selected", "#3498DB")]) style.configure("TFrame", background="#ECF0F1") tabs = ttk.Notebook(root) t_decode = ttk.Frame(tabs) # קבלה ראשון t_encode = ttk.Frame(tabs) # יצירה שני tabs.add(t_decode, text=" 📥 קליטת נתונים (חיבור AUX) ") tabs.add(t_encode, text=" 📤 יצירת קובץ (למנהל) ") tabs.pack(fill="both", expand=True, padx=10, pady=10) self.setup_decoder(t_decode) self.setup_encoder(t_encode) self.root.after(100, self.process_queue) def setup_decoder(self, parent): frame = tk.Frame(parent, bg="#ECF0F1") frame.pack(fill="both", expand=True, padx=20, pady=20) # Header tk.Label(frame, text="מערכת לקליטת עלונים וקבצים", font=("Segoe UI", 18, "bold"), bg="#ECF0F1", fg="#2C3E50").pack(pady=10) # Status & Vol status_frame = tk.Frame(frame, bg="#BDC3C7", bd=1, relief="sunken") status_frame.pack(fill="x", pady=10) self.canvas_vol = tk.Canvas(status_frame, width=300, height=15, bg="#95a5a6", highlightthickness=0) self.canvas_vol.pack(side="left", padx=10, pady=5) self.vol_bar = self.canvas_vol.create_rectangle(0,0,0,15, fill="#2ECC71") self.lbl_status = tk.Label(status_frame, text="ממתין להפעלה...", font=("Segoe UI", 10), bg="#BDC3C7") self.lbl_status.pack(side="right", padx=10) # Main Button self.btn_rec = tk.Button(frame, text="התחל האזנה", command=self.toggle_rec, font=("Segoe UI", 16, "bold"), bg="#27AE60", fg="white", activebackground="#2ECC71", activeforeground="white", bd=0, padx=20, pady=10) self.btn_rec.pack(pady=10) # Text Area tk.Label(frame, text="תוכן הקובץ המתקבל:", font=("Segoe UI", 11), bg="#ECF0F1", anchor="w").pack(fill="x") self.txt_display = scrolledtext.ScrolledText(frame, height=15, font=("Segoe UI", 12), bd=2, relief="flat") self.txt_display.pack(fill="both", expand=True, pady=5) self.txt_display.tag_config("info", foreground="#E67E22", font=("Segoe UI", 10, "bold")) self.txt_display.tag_config("content", foreground="#2C3E50") # Footer buttons btn_frame = tk.Frame(frame, bg="#ECF0F1") btn_frame.pack(pady=5) tk.Button(btn_frame, text="נקה מסך", command=self.clear_screen, bg="#95a5a6", fg="white", bd=0, padx=10).pack(side="left", padx=5) tk.Button(btn_frame, text="העתק טקסט", command=self.copy_text, bg="#3498DB", fg="white", bd=0, padx=10).pack(side="left", padx=5) def setup_encoder(self, parent): frame = tk.Frame(parent, bg="#ECF0F1") frame.pack(fill="both", expand=True, padx=20, pady=20) tk.Label(frame, text="יצירת קובץ שמע למערכת הטלפונית", font=("Segoe UI", 16, "bold"), bg="#ECF0F1", fg="#2C3E50").pack(pady=20) tk.Label(frame, text="בחר קובץ טקסט/מסמך להמרה.\nהמערכת תיצור קובץ WAV מותאם לשידור.", font=("Segoe UI", 11), bg="#ECF0F1", fg="#7F8C8D").pack(pady=10) tk.Button(frame, text="בחר קובץ והמר", command=self.do_enc, font=("Segoe UI", 14), bg="#2980B9", fg="white", activebackground="#3498DB", activeforeground="white", bd=0, padx=20, pady=15).pack(pady=40) self.lbl_enc = tk.Label(frame, text="", font=("Segoe UI", 11, "bold"), bg="#ECF0F1", fg="#27AE60") self.lbl_enc.pack() def clear_screen(self): self.txt_display.delete('1.0', tk.END) self.received_packets = {} def copy_text(self): self.root.clipboard_clear() self.root.clipboard_append(self.txt_display.get("1.0", tk.END)) messagebox.showinfo("הועתק", "התוכן הועתק ללוח") def do_enc(self): fn = filedialog.askopenfilename() if not fn: return sn = filedialog.asksaveasfilename(defaultextension=".wav", filetypes=[("WAV","*.wav")]) if not sn: return self.lbl_enc.config(text="מעבד... אנא המתן", fg="blue") self.root.update() try: with open(fn,'rb') as f: c=f.read() ok, msg = generate_stream_file(fn, c, sn) if ok: self.lbl_enc.config(text=f"הצלחה! נוצר קובץ עם {msg} מנות.", fg="#27AE60") else: self.lbl_enc.config(text=f"שגיאה: {msg}", fg="#C0392B") except Exception as e: self.lbl_enc.config(text=f"שגיאה: {e}", fg="#C0392B") def toggle_rec(self): if not self.is_rec: self.is_rec = True self.btn_rec.config(text="⏹️ עצור הקשבה", bg="#C0392B", activebackground="#E74C3C") self.audio_buffer = np.array([], dtype=float) self.lbl_status.config(text="מקשיב לקו... (נא להשמיע)", fg="#2980B9") self.clear_screen() threading.Thread(target=self.rec_thread).start() else: self.is_rec = False self.btn_rec.config(text="התחל האזנה", bg="#27AE60", activebackground="#2ECC71") self.lbl_status.config(text="מוכן") def update_vol(self, indata): peak = np.max(np.abs(indata)) / 32768.0 w = min(300, int(peak * 300)) c = "#2ECC71" if w < 200 else "#E74C3C" self.canvas_vol.coords(self.vol_bar, 0, 0, w, 15) self.canvas_vol.itemconfig(self.vol_bar, fill=c) def rec_thread(self): def cb(i, f, t, s): self.q.put(i.copy()) # שימוש בברירת מחדל של המערכת (ללא device index) try: with sd.InputStream(samplerate=SAMPLE_RATE, channels=1, callback=cb): while self.is_rec: sd.sleep(100) except Exception as e: self.is_rec = False messagebox.showerror("Error", f"שגיאת מיקרופון: {e}") def process_queue(self): try: while not self.q.empty(): chunk = self.q.get_nowait().flatten() self.update_vol(chunk * 32767) self.audio_buffer = np.concatenate((self.audio_buffer, chunk)) if len(self.audio_buffer) > SAMPLE_RATE * 15: self.audio_buffer = self.audio_buffer[-SAMPLE_RATE*15:] self.try_decode() except: pass self.root.after(50, self.process_queue) def try_decode(self): if len(self.audio_buffer) < 4000: return buff = self.audio_buffer.copy() sos = sig.butter(4, [800, 2400], btype='bandpass', fs=SAMPLE_RATE, output='sos') clean = sig.sosfilt(sos, buff) mx = np.max(np.abs(clean)) if mx > 0.01: clean /= mx corr = sig.correlate(clean, SYNC_SIGNAL, mode='valid') peaks, _ = sig.find_peaks(np.abs(corr), height=5, distance=SAMPLE_RATE) for peak_idx in peaks: start_data = peak_idx + len(SYNC_SIGNAL) if len(clean) > start_data + 500: audio_chunk = clean[start_data:] result, consumed = decode_packet_from_stream(audio_chunk) if result: pkt_id, total, content = result if pkt_id not in self.received_packets: self.received_packets[pkt_id] = True try: text = content.decode('utf-8') self.append_text(f"--- חלק {pkt_id} מתוך {total} ---\n", "info") self.append_text(text + "\n", "content") except: self.append_text(f"--- חלק {pkt_id}/{total} (קובץ בינארי) ---\n", "info") cut_pos = start_data + consumed + 100 if cut_pos < len(self.audio_buffer): self.audio_buffer = self.audio_buffer[cut_pos:] else: self.audio_buffer = np.array([], dtype=float) break def append_text(self, text, tag): self.txt_display.insert(tk.END, text, tag) self.txt_display.see(tk.END) if __name__ == "__main__": root = tk.Tk() app = AcousticReader(root) root.mainloop() -
@כבוד-הרב
אוקיי אני רואה ששונה הממשק
אבל אני רואה שהוא לוקח הרבה פחות אורך אודיו
מה שונה בלוגיקה? -
@דוד-יצחק לא בדקתי את הלוגיקה אבל הגיוני שאם זה עכשיו מותאם לטקסט אז במקום רצפים של 8 ביטים הוא יכול להסתפק ב 5 או כמה שהוא בחר לייצג
@מתכנת-חובב עכשיו שזה לטקסט זה המיר לי 2,697 תווים ל-7:10 דקות!
