DwarfLab DWARF mini Smart-Teleskop

img_6a0af087cc11b9.32714038.webp

 

Skywatcher Maksutov Teleskop MC 127/1500 SkyMax BD AZ-S GoTo

img_6a0af0c788c003.42747064.png

Die Steuerung erfolgt mit einem Raspberry Pi 3 Model B V1.2

img_6a15a699056080.07488466.png

Während der Steuerung des Teleskops können per Kamera auch Aufnahmen gemacht werden. Damit die Steuerung bequem per App erfolgen kann, läuft der rPi0w mit ser2net als SkyFi Nachbau.

ser2net im aktuellen Debian-Release nutzt keine .conf Datei zur Konfiguration, sondern .yaml. Die sollen grundsätzlich robuster und besser "zu lesen" sein, als die .conf Dateien, aber ich habe da immer wieder Probleme mit Leerzeichen, Umbrüchen, etc. ...
Eine funktionierende .yaml für die vorhandene SkyWatcher SynScan GoTo ist die Folgende:

### HM-MOD-RPI-PCB
connection: &con9600
    accepter: tcp,192.168.178.167,4030
    options:
      kickolduser: true
    connector: serialdev,/dev/ttyUSB0,9600n81
    timeout: 10

Die eigentlich Aufzeichnung der Bilder erfolgt mit einem Raspberry Pi camera module 3 GS. Der Global Shutter ist zwar eigentlich für schnelle Aufnahmen gedacht, allerdings hat dieses Kameramodul einen Sony IMX296 Sensor mit einer Pixelgröße von 3.45 µm × 3.45 µm, womit sich sehr detailreiche Aufnahmen machen lassen. Die Verarbeitung erfolgt live mit einem KI-generierten Python-Script.

Das Script bietet Voreinstellungen (manual, moon, planet, deep sky) für verschiedene Beobachtungen. Die Voreinstellungen für Shutter / Gain werden im Automatikmodus basierend auf dem Histogramm permanent angepasst. Das jeweils aktuelle Bild wird als Vorschau angezeigt. Die erzeugten RAW-Dateien werden direkt in das .fits Format zum späteren Stacking umgewandelt. Ein Backup-Button kopiert sämtliche Daten auf einen USB-Stick und bereinigt die Verzeichnisse für die nächste Aufnahme. Das Stacking erfolgt am Rechner mittels SIRIL

# ✅ AstroStacker 1.3.1

import time
import shutil
import logging
import threading
import subprocess
from pathlib import Path
import tkinter as tk
from tkinter import messagebox
from datetime import datetime

import rawpy
from astropy.io import fits
from PIL import Image, ImageTk

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np

# ===== CONFIG =====
input_dir = Path("/home/pi/astrostacker/output")
output_dir = input_dir / "fits"
backup_dir = input_dir / "backup"
usb_mount = Path("/media/pi")

output_dir.mkdir(exist_ok=True)
backup_dir.mkdir(exist_ok=True)

# ===== LOGGING =====
logger = logging.getLogger("astro")
logger.setLevel(logging.INFO)


file_handler = logging.FileHandler(input_dir / "processing.log", mode="a")
file_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))

class TkLogHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)

        def update():
            log_var.set(msg)

        try:
            root.after(0, update)  # thread-safe GUI update
        except Exception:
            pass

logger.addHandler(file_handler)
gui_handler = TkLogHandler()
gui_handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(gui_handler)
running = False
auto_exposure = True

# ===== PRESETS =====
presets = {
    "Manual": None,
    "Moon": (2000, 1),
    "Planet": (10000, 8),
    "Deep Sky": (500000, 16)
}

# ===== HISTOGRAM ANALYSIS =====
def analyze_histogram(image_path):
    img = Image.open(image_path).convert("L")
    arr = np.array(img)

    hist, _ = np.histogram(arr, bins=256, range=(0, 255))
    total = hist.sum()

    cdf = np.cumsum(hist) / total
    p10 = np.argmax(cdf > 0.10)
    p90 = np.argmax(cdf > 0.90)

    bright_pixels = np.sum(arr > 240)
    star_ratio = bright_pixels / arr.size

    return p10, p90, star_ratio, hist

# ===== AUTO EXPOSURE =====
def auto_adjust_exposure(image_path):
    try:

        preset = preset_var.get()

        # ✅ MANUAL MODE = no auto exposure
        if preset == "Manual":
            return

        p10, p90, star_ratio, _ = analyze_histogram(image_path)

        shutter = int(shutter_var.get())
        gain = int(gain_var.get())
        preset = preset_var.get()

        if p90 < 180:
            shutter = int(shutter * 1.2)
            gain = min(gain + 1, 30)
        elif p90 > 240:
            shutter = int(shutter * 0.7)
            gain = max(gain - 1, 1)

        if preset == "Deep Sky" and star_ratio > 0.01:
            shutter = int(shutter * 0.85)
            gain = max(gain - 1, 1)
        shutter = max(100, min(shutter, 2_000_000))

        shutter_var.set(str(shutter))
        gain_var.set(str(gain))

    except Exception as e:
        logger.error(e)


# ===== HISTOGRAM IMAGE =====
def draw_histogram(image_path):
    try:
        _, _, _, hist = analyze_histogram(image_path)

        plt.figure(figsize=(3, 1))
        plt.plot(hist)
        plt.tight_layout()

        path = input_dir / "hist.png"
        plt.savefig(path)
        plt.close()

        return path


    except Exception as e:
        logger.error(e)
        return None

# ===== PREVIEW =====
def update_preview():
    files = sorted(input_dir.glob("*.jpg"), key=lambda x: x.stat().st_mtime, reverse=True)
    if not files:
        return

    latest = files[0]

    try:
        img = Image.open(latest)
        img.thumbnail((450, 300))
        imgtk = ImageTk.PhotoImage(img)
        image_label.config(image=imgtk)
        image_label.image = imgtk

        filename_var.set(latest.name)

        hist_path = draw_histogram(latest)
        if hist_path:
            himg = Image.open(hist_path)
            himgtk = ImageTk.PhotoImage(himg)
            hist_label.config(image=himgtk)
            hist_label.image = himgtk

    except Exception as e:
        logger.error(e)


# ===== PROCESS LOOP =====
def process_loop():
    global running
    while running:
        for f in input_dir.glob("*.dng"):
            try:
                logger.info(f"Processing {f.name}")


                with rawpy.imread(str(f)) as raw:
                    rgb = raw.postprocess(use_camera_wb=True)
                    fits.PrimaryHDU(rgb).writeto(output_dir / (f.stem + ".fits"), overwrite=True)

                shutil.move(str(f), backup_dir / f.name)
                logger.info(f"Finished {f.name}")


            except Exception as e:
                logger.error(e)


        update_preview()
        time.sleep(1)

# ===== CAMERA LOOP =====
def capture_loop():
    global running
    k = 0

    while running:
        try:
            shutter = int(shutter_var.get())
            gain = int(gain_var.get())

            path = f"/home/pi/astrostacker/output/img_{k}_{int(time.time())}.jpg"

            cmd = (
                f'rpicam-still -o "{path}" '
                f'--shutter {shutter} --gain {gain} --raw -n '
                f'--awbgains 1,1 --denoise off'
            )

            logger.info(f"Capture {k} | shutter={shutter} gain={gain}")

            start = time.time()
            subprocess.run(cmd, shell=True)
            elapsed = time.time() - start

            if auto_exposure:
                auto_adjust_exposure(path)

            k += 1

            delay = max((shutter / 1e6) * 1.1 - elapsed, 0)
            time.sleep(delay)

        except Exception as e:
            logger.error(e)


# ===== CONTROLS =====
def start():
    global running
    if not running:
        logger.info("START pressed")
        running = True
        threading.Thread(target=process_loop, daemon=True).start()
        threading.Thread(target=capture_loop, daemon=True).start()



def stop():
    global running
    logger.info("STOP pressed")
    running = False



def apply_preset(choice):
    if presets.get(choice):
        s, g = presets[choice]
        shutter_var.set(str(s))
        gain_var.set(str(g))

    # ✅ Auto exposure behavior
    if choice == "Manual":
        auto_var.set(False)
        toggle_auto()
        logger.info("Manual mode → auto exposure OFF")
    else:
        auto_var.set(True)
        toggle_auto()
        logger.info(f"{choice} preset → auto exposure ON")


def toggle_auto():
    global auto_exposure
    auto_exposure = auto_var.get()

# ===== BACKUP =====
def backup():
    def run():
        try:
            usb_devices = list(usb_mount.glob("*"))
            if not usb_devices:
                raise Exception("No USB device found!")

            usb = usb_devices[0]
            target = usb / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

            logger.info(f"USB mount detected: {usb}")
            logger.info(f"Backup target directory: {target}")

            source = Path("/home/pi")

            logger.info("===== BACKUP SESSION START =====")
            logger.info(f"Source: /home/pi")
            logger.info(f"Destination: {target}")
            usage = shutil.disk_usage(usb)
            logger.info(f"USB free space: {usage.free // (1024*1024)} MB")

            target.mkdir(parents=True, exist_ok=True)


            # ✅ copy file-by-file with progress logging
            files = list(source.rglob("*"))
            total = len(files)


            for i, f in enumerate(files):
                try:
                    rel = f.relative_to(source)
                    dest = target / rel


                    if f.is_dir():
                        dest.mkdir(exist_ok=True)
                    else:
                        shutil.copy2(f, dest)


                    # ✅ progress update
                    logger.info(f"Backup {i+1}/{total}: {rel}")


                except Exception as e:
                    logger.error(f"Backup error: {f} -> {e}")


            logger.info("Backup complete")
            logger.info("===== BACKUP SESSION END =====")
            # ✅ CLEAN OUTPUT FOLDERS (keep structure)
            try:
                base = Path("/home/pi/astrostacker/output")
                # clean output root (files only)
                for item in base.iterdir():
                    if item.is_file():
                        item.unlink()

                # clean fits + backup content only
                for sub in [base / "fits", base / "backup"]:
                    if sub.exists():
                        for item in sub.iterdir():
                            if item.is_file():
                                item.unlink()
                            elif item.is_dir():
                                shutil.rmtree(item)

                logger.info("Output folders cleaned (fits + backup preserved)")
            except Exception as e:
                logger.error(f"Cleanup failed: {e}")
            messagebox.showinfo("Done", "Backup + cleanup complete")




        except Exception as e:
            logger.error(e)
            messagebox.showerror("Error", str(e))

    threading.Thread(target=run, daemon=True).start()

# ===== GUI =====
root = tk.Tk()
root.geometry("800x480")
root.title("Astrostacker PRO")

root.columnconfigure(0, weight=2)
root.columnconfigure(1, weight=1)

left = tk.Frame(root)
left.grid(row=0, column=0, sticky="nsew")

right = tk.Frame(root)
right.grid(row=0, column=1, sticky="nsew")

# LEFT
image_label = tk.Label(left, bg="black")
image_label.pack(fill="both", expand=True)

filename_var = tk.StringVar(value="No image")
tk.Label(left, textvariable=filename_var).pack()

log_var = tk.StringVar(value="No log yet")
tk.Label(left, textvariable=log_var, fg="blue", wraplength=400, justify="left").pack()

# RIGHT
shutter_var = tk.StringVar(value="10000")
gain_var = tk.StringVar(value="16")

tk.Label(right, text="Preset").pack()
preset_var = tk.StringVar(value="Manual")
tk.OptionMenu(right, preset_var, *presets.keys(), command=apply_preset).pack()

tk.Label(right, text="Shutter").pack()
tk.Entry(right, textvariable=shutter_var).pack()

tk.Label(right, text="Gain").pack()
tk.Entry(right, textvariable=gain_var).pack()

auto_var = tk.BooleanVar(value=True)
tk.Checkbutton(right, text="Auto Exposure", variable=auto_var, command=toggle_auto).pack()

btns = tk.Frame(right)
btns.pack(pady=10)

tk.Button(btns, text="START", bg="green", fg="white", width=10, height=2, command=start).grid(row=0, column=0)
tk.Button(btns, text="STOP", bg="red", fg="white", width=10, height=2, command=stop).grid(row=0, column=1)

tk.Button(right, text="Backup", command=backup).pack(fill="x", pady=5)

# Histogram
hist_label = tk.Label(right)
hist_label.pack()

root.mainloop()

Ein zweites KI-generiertes Python-Script wird zur Videoaufzeichnung genutzt. Hier wird ebenfalls eine LIVE-Vorschau angezeigt. Mit den Buttons START / STOP erfolgt die Steuerung der Aufnahme.

# ✅ AstroVideo 1.0.1

import tkinter as tk
from PIL import Image, ImageTk
from datetime import datetime
import os
import threading
import time

from picamera2 import Picamera2
from picamera2.encoders import H264Encoder
from picamera2.outputs import FileOutput

class CameraApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Camera Recorder")

        # ✅ Fixed window size
        self.root.geometry("800x480")
        self.root.resizable(False, False)

        # ✅ Save path
        self.save_path = "/home/pi/astrostacker/output"
        os.makedirs(self.save_path, exist_ok=True)

        # ✅ Camera setup
        self.picam2 = Picamera2()
        config = self.picam2.create_preview_configuration(
            main={"size": (800, 350)}
        )
        self.picam2.configure(config)
        self.picam2.start()

        # small warm-up (important for stability)
        time.sleep(0.5)

        self.recording = False
        self.encoder = None
        self.output = None

        # ✅ Layout
        self.preview_frame = tk.Frame(root, width=800, height=350)
        self.preview_frame.pack_propagate(False)
        self.preview_frame.pack()

        self.control_frame = tk.Frame(root, height=130)
        self.control_frame.pack(fill="both", expand=True)

        # Preview display
        self.label = tk.Label(self.preview_frame)
        self.label.pack(fill="both", expand=True)

        # Buttons
        self.start_button = tk.Button(
            self.control_frame,
            text="Start Recording",
            command=self.start_recording,
            bg="green",
            fg="white",
            width=20,
            height=2
        )
        self.start_button.pack(side="left", padx=20, pady=20)

        self.stop_button = tk.Button(
            self.control_frame,
            text="Stop Recording",
            command=self.stop_recording_thread,
            bg="red",
            fg="white",
            width=20,
            height=2
        )
        self.stop_button.pack(side="right", padx=20, pady=20)

        # Status
        self.status_label = tk.Label(self.control_frame, text="Status: Idle")
        self.status_label.pack(pady=5)

        # Start preview loop
        self.update_preview()

    # ✅ Live preview
    def update_preview(self):
        frame = self.picam2.capture_array()
        img = Image.fromarray(frame).resize((800, 350))

        imgtk = ImageTk.PhotoImage(image=img)
        self.label.imgtk = imgtk
        self.label.config(image=imgtk)

        self.root.after(30, self.update_preview)

    # ✅ Start recording (NON-BLOCKING)
    def start_recording(self):
        if not self.recording:
            filename = datetime.now().strftime("video_%Y%m%d_%H%M%S.h264")
            self.filepath = os.path.join(self.save_path, filename)

            self.encoder = H264Encoder()
            self.output = FileOutput(self.filepath)

            # ✅ Use encoder instead of start_recording
            self.picam2.start_encoder(self.encoder, self.output)

            self.recording = True
            self.status_label.config(text=f"Recording: {filename}")

            print(f"Recording started: {self.filepath}")

    # ✅ Stop button handler (threaded)
    def stop_recording_thread(self):
        if self.recording:
            self.status_label.config(text="Stopping...")

            self.start_button.config(state="disabled")
            self.stop_button.config(state="disabled")

            threading.Thread(target=self.stop_recording, daemon=True).start()

    # ✅ Stop encoder safely (NO FREEZE)
    def stop_recording(self):
        try:
            self.picam2.stop_encoder()
            print("Encoder stopped")
        except Exception as e:
            print("Stop error:", e)

        self.recording = False

        # ✅ Back to GUI thread
        self.root.after(0, self.finish_stop_ui)

    def finish_stop_ui(self):
        self.status_label.config(text="Saved to /home/pi/Videos")
        self.start_button.config(state="normal")
        self.stop_button.config(state="normal")

        print("Recording finished")

# ✅ Run app
if __name__ == "__main__":
    root = tk.Tk()
    app = CameraApp(root)
    root.mainloop()