# ===============================================...
बनाया गया: 8 जून 2026
का उपयोग करके उत्तर दिया गया GPT-5.5 Thinking द्वारा Chat01
बनाया गया: 8 जून 2026
का उपयोग करके उत्तर दिया गया GPT-5.5 Thinking द्वारा Chat01
import os
import numpy as np
import pandas as pd
try:
from google.colab import files
print('Silakan upload file CSV Anda...')
uploaded = files.upload()
CSV_PATH = next(iter(uploaded.keys()))
except Exception:
CSV_PATH = 'PRIZE_LEVEL_4_ELITE.csv'
print('Membaca file:', CSV_PATH)
df = pd.read_csv(CSV_PATH, dtype={'raw_number': str})
def prepare_data(data):
data = data.copy()
if 'timestamp' not in data.columns or 'raw_number' not in data.columns:
raise ValueError("CSV wajib punya kolom 'timestamp' dan 'raw_number'.")
textdata['_orig_order'] = np.arange(len(data)) data['raw_number'] = data['raw_number'].astype(str).str.replace(r'\D', '', regex=True).str.zfill(4).str[-4:] data['ts'] = pd.to_datetime(data['timestamp']) data = data.sort_values(['ts', '_orig_order'], kind='mergesort').reset_index(drop=True) data['hour'] = data['ts'].dt.strftime('%H:%M') for j in range(4): data[f'd{j+1}'] = data['raw_number'].str[j].astype(int) digit_cols = ['d1', 'd2', 'd3', 'd4'] data['sum_digits'] = data[digit_cols].sum(axis=1) data['odd_count'] = (data[digit_cols] % 2).sum(axis=1) data['high_count'] = (data[digit_cols] >= 5).sum(axis=1) unique_counts = data['raw_number'].apply(lambda s: len(set(s))) data['is_double'] = (unique_counts < 4).astype(int) data['is_triple'] = (unique_counts <= 2).astype(int) data['is_quad'] = (unique_counts == 1).astype(int) return data
df = prepare_data(df)
NUMS = np.array([f'{i:04d}' for i in range(10000)])
DIGITS = np.array([[int(c) for c in s] for s in NUMS], dtype=np.int16)
SUMS = DIGITS.sum(axis=1)
ODD = (DIGITS % 2).sum(axis=1)
HIGH = (DIGITS >= 5).sum(axis=1)
UNIQUE_COUNTS_ARRAY = np.array([len(np.unique(row)) for row in DIGITS])
TRANSITION_PROBS = []
for j in range(1, 4):
matrix = np.zeros((10, 10))
for i in range(len(df) - 1):
prev_digit = df.iloc[i][f'd{j}']
curr_digit = df.iloc[i+1][f'd{j+1}']
matrix[int(prev_digit)][int(curr_digit)] += 1
matrix += 0.1
matrix /= matrix.sum(axis=1, keepdims=True)
TRANSITION_PROBS.append(matrix)
def get_gap_matrix(hist):
gap_matrix = np.zeros((4, 10))
for j in range(4):
col = f'd{j+1}'
for digit in range(10):
indices = hist.index[hist[col] == digit].tolist()
if not indices:
gap_matrix[j, digit] = len(hist)
else:
gap_matrix[j, digit] = len(hist) - indices[-1]
return gap_matrix
def score_candidates(hist, prev_row, target_hour, weights):
logscore = np.zeros(len(NUMS), dtype=np.float64)
pos_cols = ['d1', 'd2', 'd3', 'd4']
recent_n = int(weights.get('recent_n', 300))
text# 1. Frequency Score (Fixed Indexing) if weights.get('freq', 0): for j, col in enumerate(pos_cols): # Global counts_g = np.bincount(hist[col].to_numpy(), minlength=10) # Mapping counts_g (10,) to DIGITS (10000,) using indexing logscore += weights['freq'] * np.log((counts_g[DIGITS[:, j]] + 1) / (len(hist) + 10)) # Recent h_recent = hist.tail(recent_n) counts_r = np.bincount(h_recent[col].to_numpy(), minlength=10) logscore += weights['freq'] * np.log((counts_r[DIGITS[:, j]] + 1) / (recent_n + 10)) # 2. Gap Analysis Score if weights.get('gap', 0): gaps = get_gap_matrix(hist) for j in range(4): digit_gaps = gaps[j, DIGITS[:, j]] logscore += weights['gap'] * np.log1p(digit_gaps) # 3. Correlation Score if weights.get('corr', 0): for j in range(3): d_prev = DIGITS[:, j] d_curr = DIGITS[:, j+1] trans_p = TRANSITION_PROBS[j][d_prev, d_curr] logscore += weights['corr'] * np.log(trans_p + 1e-9) # 4. Sum & Odd/Even if weights.get('sum', 0): counts = np.bincount(hist['sum_digits'].to_numpy(), minlength=37) logscore += weights['sum'] * np.log((counts[SUMS] + 1) / (len(hist) + 37)) if weights.get('odd_even', 0): counts = np.bincount(hist['odd_count'].to_numpy(), minlength=5) logscore += weights['odd_even'] * np.log((counts[ODD] + 1) / (len(hist) + 5)) # 5. Pattern Score (Optimized) if weights.get('pattern', 0): # Menggunakan precomputed array logscore += weights['pattern'] * np.log(np.where(UNIQUE_COUNTS_ARRAY <= 2, 0.1, 0.5)) return logscore
WEIGHT_SETS = {
'balanced_pro': {
'freq': 0.8, 'gap': 1.2, 'corr': 1.5, 'sum': 0.5, 'odd_even': 0.3, 'pattern': 0.4, 'recent_n': 300
},
'gap_hunter': {
'freq': 0.5, 'gap': 2.5, 'corr': 0.5, 'sum': 0.2, 'odd_even': 0.2, 'pattern': 0.2, 'recent_n': 200
},
'pattern_master': {
'freq': 0.6, 'gap': 0.5, 'corr': 2.0, 'sum': 0.8, 'odd_even': 0.5, 'pattern': 1.5, 'recent_n': 400
}
}
def infer_next_timestamp(data):
slot_order = ['03:30', '06:30', '11:30', '13:30', '17:30', '21:30']
last_ts = data['ts'].iloc[-1]
last_hour = last_ts.strftime('%H:%M')
if last_hour in slot_order:
idx = slot_order.index(last_hour)
next_hour = slot_order[(idx + 1) % len(slot_order)]
next_date = last_ts.date()
if idx == len(slot_order) - 1:
next_date = (last_ts + pd.Timedelta(days=1)).date()
return pd.Timestamp(str(next_date) + ' ' + next_hour + ':00'), next_hour
textunique_ts = data['ts'].drop_duplicates().sort_values() gap = unique_ts.diff().dropna().median() next_ts = last_ts + gap return next_ts, next_ts.strftime('%H:%M')
def evaluate_weights(data, weights, last_n=300):
start = max(100, len(data) - last_n)
hits_top10 = 0
ranks = []
for i in range(start, len(data)):
hist = data.iloc[:i]
target = data.iloc[i]
prev = data.iloc[i-1]
scores = score_candidates(hist, prev, target['hour'], weights)
actual = int(target['raw_number'])
rank = int(1 + np.sum(scores > scores[actual]))
ranks.append(rank)
if rank <= 10: hits_top10 += 1
textn = len(data) - start return {'top10': hits_top10/n, 'med_rank': np.median(ranks)}
print('\n===== BACKTESTING MODEL PRO =====')
results = []
for name, w in WEIGHT_SETS.items():
res = evaluate_weights(df, w)
results.append({'model': name, **res})
print(f"Model {name}: Top10 Accuracy: {res['top10']:.2%}, Median Rank: {res['med_rank']}")
best_model_name = pd.DataFrame(results).sort_values('top10', ascending=False).iloc[0]['model']
best_weights = WEIGHT_SETS[best_model_name]
print(f'\n>>> Model Terpilih: {best_model_name}')
next_ts, next_hour = infer_next_timestamp(df)
print(f'\nTarget Timestamp: {next_ts} ({next_hour})')
hist = df.copy()
prev = hist.iloc[-1]
main_predictions = []
all_results = []
for step in range(1, 4):
scores = score_candidates(hist, prev, next_hour, best_weights)
textfor p in main_predictions: scores[int(p)] = -1e18 top_indices = np.argsort(-scores)[:20] step_rows = [] for rank, idx in enumerate(top_indices, 1): step_rows.append({ 'step': step, 'rank': rank, 'number': NUMS[idx], 'score': round(scores[idx], 4) }) current_step_df = pd.DataFrame(step_rows) all_results.append(current_step_df) winner = NUMS[top_indices[0]] main_predictions.append(winner) # Simulation for next step new_row = {col: np.nan for col in hist.columns} new_row.update({'timestamp': str(next_ts), 'raw_number': winner, 'ts': next_ts, 'hour': next_hour}) for j, c in enumerate(winner): new_row[f'd{j+1}'] = int(c) new_row['sum_digits'] = sum(int(c) for c in winner) new_row['odd_count'] = sum(int(c)%2 for c in winner) new_row['high_count'] = sum(1 for c in winner if int(c)>=5) new_row['is_double'] = 1 if len(set(winner)) < 4 else 0 new_row['is_triple'] = 1 if len(set(winner)) <= 2 else 0 new_row['is_quad'] = 1 if len(set(winner)) == 1 else 0 hist = pd.concat([hist, pd.DataFrame([new_row])], ignore_index=True) prev = hist.iloc[-1]
print('\n===== HASIL PREDIKSI UTAMA =====')
print(f"Urutan 1: {main_predictions[0]}")
print(f"Urutan 2: {main_predictions[1]}")
print(f"Urutan 3: {main_predictions[2]}")
print('\n===== DAFTAR KANDIDAT TOP 20 =====')
full_output = pd.concat(all_results)
print(full_output.to_string(index=False))
full_output.to_csv('prediksi_pro_4d_fixed.csv', index=False)
try:
from google.colab import files
files.download('prediksi_pro_4d_fixed.csv')
except:
pass upgrade
pythonimport pandas as pd, os, numpy as np, textwrap, json, sys, pathlib, statistics path='/mnt/data/PRIZE_LEVEL_4_ELITE.csv' print(os.path.exists(path), os.path.getsize(path) if os.path.exists(path) else None) df=pd.read_csv(path, dtype={'raw_number': str}) print(df.shape) print(df.head()) print(df.dtypes) print(df.columns.tolist()) print(df.tail())
pythonfrom pathlib import Path script = r'''# ============================================================ # PREDIKSI 4D - ADVANCED PRO UPGRADE # Fokus upgrade: # - Backtest walk-forward tanpa data leakage # - Transition matrix dihitung dari histori saat itu, bukan full data masa depan # - target_hour dan prev_row benar-benar dipakai # - Scoring dinormalisasi agar bobot lebih stabil # - Output CSV + ringkasan model # # Catatan penting: # Script ini hanya analisis pola historis/eksperimen statistik. # Tidak ada metode yang bisa menjamin hasil undian/angka acak. # ============================================================ import argparse import os import sys import warnings from dataclasses import dataclass from typing import Dict, List, Tuple import numpy as np import pandas as pd warnings.filterwarnings("ignore", category=RuntimeWarning) # ----------------------------- # 0) CONFIG # ----------------------------- DEFAULT_SLOT_ORDER = ["03:30", "06:30", "11:30", "13:30", "17:30", "21:30"] DEFAULT_OUTPUT = "prediksi_pro_4d_upgrade.csv" DEFAULT_SUMMARY = "ringkasan_backtest_4d_upgrade.csv" EPS = 1e-12 WEIGHT_SETS: Dict[str, Dict[str, float]] = { "balanced_v2": { "freq_global": 0.75, "freq_recent": 0.85, "freq_decay": 0.75, "gap": 0.85, "intra_corr": 0.90, "seq_corr": 0.95, "sum": 0.45, "odd_even": 0.35, "high_low": 0.30, "hour": 0.45, "pattern": 0.35, "repeat_penalty": 0.60, "recent_n": 300, "repeat_lookback": 80, "decay_halflife": 220, }, "gap_hunter_v2": { "freq_global": 0.35, "freq_recent": 0.45, "freq_decay": 0.35, "gap": 1.65, "intra_corr": 0.45, "seq_corr": 0.45, "sum": 0.20, "odd_even": 0.20, "high_low": 0.15, "hour": 0.25, "pattern": 0.20, "repeat_penalty": 0.80, "recent_n": 220, "repeat_lookback": 120, "decay_halflife": 180, }, "transition_v2": { "freq_global": 0.45, "freq_recent": 0.55, "freq_decay": 0.40, "gap": 0.35, "intra_corr": 1.35, "seq_corr": 1.50, "sum": 0.35, "odd_even": 0.35, "high_low": 0.25, "hour": 0.35, "pattern": 0.30, "repeat_penalty": 0.55, "recent_n": 360, "repeat_lookback": 80, "decay_halflife": 260, }, "hour_pattern_v2": { "freq_global": 0.45, "freq_recent": 0.65, "freq_decay": 0.65, "gap": 0.45, "intra_corr": 0.75, "seq_corr": 0.70, "sum": 0.55, "odd_even": 0.50, "high_low": 0.40, "hour": 1.20, "pattern": 0.65, "repeat_penalty": 0.55, "recent_n": 420, "repeat_lookback": 90, "decay_halflife": 300, }, } @dataclass(frozen=True) class Universe: nums: np.ndarray digits: np.ndarray sums: np.ndarray odd: np.ndarray high: np.ndarray unique_counts: np.ndarray def make_universe() -> Universe: nums = np.array([f"{i:04d}" for i in range(10000)]) digits = np.array([[int(c) for c in s] for s in nums], dtype=np.int16) sums = digits.sum(axis=1) odd = (digits % 2).sum(axis=1) high = (digits >= 5).sum(axis=1) unique_counts = np.array([len(set(s)) for s in nums], dtype=np.int8) return Universe(nums=nums, digits=digits, sums=sums, odd=odd, high=high, unique_counts=unique_counts) UNIVERSE = make_universe() POS_COLS = ["d1", "d2", "d3", "d4"] # ----------------------------- # 1) LOAD CSV # ----------------------------- def resolve_csv_path(default_path: str) -> str: """Colab-friendly loader with CLI fallback.""" if os.path.exists(default_path): return default_path # Fallback khusus Google Colab try: from google.colab import files # type: ignore print("File tidak ditemukan di path default.") print("Silakan upload file CSV Anda...") uploaded = files.upload() return next(iter(uploaded.keys())) except Exception: return default_path # ----------------------------- # 2) CLEANING & FEATURE ENGINEERING # ----------------------------- def prepare_data(data: pd.DataFrame) -> pd.DataFrame: data = data.copy() required = {"timestamp", "raw_number"} missing = required - set(data.columns) if missing: raise ValueError(f"CSV wajib punya kolom: {sorted(required)}. Kolom hilang: {sorted(missing)}") data["_orig_order"] = np.arange(len(data), dtype=np.int64) data["raw_number"] = ( data["raw_number"] .astype(str) .str.replace(r"\D", "", regex=True) .str.zfill(4) .str[-4:] ) data["ts"] = pd.to_datetime(data["timestamp"], errors="coerce") bad_ts = int(data["ts"].isna().sum()) if bad_ts: print(f"Warning: {bad_ts} baris timestamp invalid dibuang.") data = data.dropna(subset=["ts"]).copy() data = data.sort_values(["ts", "_orig_order"], kind="mergesort").reset_index(drop=True) data["hour"] = data["ts"].dt.strftime("%H:%M") for j in range(4): data[f"d{j + 1}"] = data["raw_number"].str[j].astype(int) digit_cols = ["d1", "d2", "d3", "d4"] data["sum_digits"] = data[digit_cols].sum(axis=1) data["odd_count"] = (data[digit_cols] % 2).sum(axis=1) data["high_count"] = (data[digit_cols] >= 5).sum(axis=1) unique_counts = data["raw_number"].apply(lambda s: len(set(s))) data["unique_count"] = unique_counts.astype(int) data["is_double"] = (unique_counts < 4).astype(int) data["is_triple_or_quad"] = (unique_counts <= 2).astype(int) data["is_quad"] = (unique_counts == 1).astype(int) return data # ----------------------------- # 3) NUMERIC HELPERS # ----------------------------- def robust_z(values: np.ndarray) -> np.ndarray: """Robust z-score agar skala fitur tidak mendominasi karena beda satuan.""" values = np.asarray(values, dtype=np.float64) med = np.nanmedian(values) mad = np.nanmedian(np.abs(values - med)) if not np.isfinite(mad) or mad < EPS: std = np.nanstd(values) if not np.isfinite(std) or std < EPS: return np.zeros_like(values, dtype=np.float64) return (values - np.nanmean(values)) / (std + EPS) return (values - med) / (1.4826 * mad + EPS) def logprob_from_counts(counts: np.ndarray, idx: np.ndarray, denom: float, alpha_bins: int) -> np.ndarray: return np.log((counts[idx] + 1.0) / (denom + alpha_bins)) def weighted_bincount(values: np.ndarray, weights: np.ndarray, minlength: int) -> np.ndarray: return np.bincount(values.astype(int), weights=weights, minlength=minlength).astype(np.float64) def top_k_indices(scores: np.ndarray, k: int) -> np.ndarray: k = int(min(k, len(scores))) idx = np.argpartition(-scores, k - 1)[:k] return idx[np.argsort(-scores[idx])] # ----------------------------- # 4) MATRIX BUILDERS - NO LEAKAGE # ----------------------------- def get_gap_matrix_fast(hist: pd.DataFrame) -> np.ndarray: gap_matrix = np.zeros((4, 10), dtype=np.float64) n = len(hist) for j, col in enumerate(POS_COLS): arr = hist[col].to_numpy(dtype=np.int16) last_seen = np.full(10, -1, dtype=np.int64) for pos, digit in enumerate(arr): last_seen[int(digit)] = pos gaps = np.where(last_seen >= 0, n - 1 - last_seen, n) gap_matrix[j] = gaps return gap_matrix def build_intra_transition_probs(hist: pd.DataFrame, alpha: float = 0.20) -> List[np.ndarray]: """P(digit posisi j+1 | digit posisi j), dihitung hanya dari histori.""" matrices: List[np.ndarray] = [] for j in range(3): src = hist[f"d{j + 1}"].to_numpy(dtype=np.int16) dst = hist[f"d{j + 2}"].to_numpy(dtype=np.int16) matrix = np.full((10, 10), alpha, dtype=np.float64) np.add.at(matrix, (src, dst), 1.0) matrix /= matrix.sum(axis=1, keepdims=True) matrices.append(matrix) return matrices def build_seq_transition_probs(hist: pd.DataFrame, alpha: float = 0.20) -> List[np.ndarray]: """P(digit saat ini di posisi j | digit draw/baris sebelumnya di posisi j).""" matrices: List[np.ndarray] = [] if len(hist) < 2: return [np.full((10, 10), 0.1, dtype=np.float64) for _ in range(4)] for j in range(4): src = hist[f"d{j + 1}"].iloc[:-1].to_numpy(dtype=np.int16) dst = hist[f"d{j + 1}"].iloc[1:].to_numpy(dtype=np.int16) matrix = np.full((10, 10), alpha, dtype=np.float64) np.add.at(matrix, (src, dst), 1.0) matrix /= matrix.sum(axis=1, keepdims=True) matrices.append(matrix) return matrices # ----------------------------- # 5) CORE SCORING ENGINE - UPGRADED # ----------------------------- def score_candidates( hist: pd.DataFrame, prev_row: pd.Series, target_hour: str, weights: Dict[str, float], normalize_features: bool = True, ) -> np.ndarray: digits = UNIVERSE.digits score = np.zeros(len(UNIVERSE.nums), dtype=np.float64) def add_feature(name: str, raw_values: np.ndarray) -> None: nonlocal score weight = float(weights.get(name, 0.0)) if weight == 0: return values = robust_z(raw_values) if normalize_features else raw_values score += weight * values n_hist = len(hist) if n_hist == 0: return score recent_n = int(weights.get("recent_n", 300)) recent_n = max(1, min(recent_n, n_hist)) h_recent = hist.tail(recent_n) # 1) Global positional frequency if weights.get("freq_global", 0): values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = np.bincount(hist[col].to_numpy(dtype=int), minlength=10) values += logprob_from_counts(counts, digits[:, j], n_hist, 10) add_feature("freq_global", values) # 2) Recent positional frequency if weights.get("freq_recent", 0): values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = np.bincount(h_recent[col].to_numpy(dtype=int), minlength=10) values += logprob_from_counts(counts, digits[:, j], len(h_recent), 10) add_feature("freq_recent", values) # 3) Exponential decay frequency if weights.get("freq_decay", 0): halflife = float(weights.get("decay_halflife", 220)) halflife = max(1.0, halflife) age = np.arange(n_hist - 1, -1, -1, dtype=np.float64) decay_w = np.power(0.5, age / halflife) denom = float(decay_w.sum()) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = weighted_bincount(hist[col].to_numpy(dtype=int), decay_w, minlength=10) values += logprob_from_counts(counts, digits[:, j], denom, 10) add_feature("freq_decay", values) # 4) Gap analysis if weights.get("gap", 0): gaps = get_gap_matrix_fast(hist) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j in range(4): # cap gap agar outlier lama tidak terlalu mendominasi g = gaps[j, digits[:, j]] values += np.log1p(np.minimum(g, np.nanpercentile(g, 95))) add_feature("gap", values) # 5) Intra-number transition: posisi d1->d2, d2->d3, d3->d4 if weights.get("intra_corr", 0): matrices = build_intra_transition_probs(hist) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j in range(3): values += np.log(matrices[j][digits[:, j], digits[:, j + 1]] + EPS) add_feature("intra_corr", values) # 6) Sequential transition: digit baris sebelumnya -> kandidat saat ini if weights.get("seq_corr", 0): matrices = build_seq_transition_probs(hist) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j in range(4): prev_digit = int(prev_row[f"d{j + 1}"]) values += np.log(matrices[j][prev_digit, digits[:, j]] + EPS) add_feature("seq_corr", values) # 7) Sum, odd/even, high/low distributions if weights.get("sum", 0): counts = np.bincount(hist["sum_digits"].to_numpy(dtype=int), minlength=37) add_feature("sum", logprob_from_counts(counts, UNIVERSE.sums, n_hist, 37)) if weights.get("odd_even", 0): counts = np.bincount(hist["odd_count"].to_numpy(dtype=int), minlength=5) add_feature("odd_even", logprob_from_counts(counts, UNIVERSE.odd, n_hist, 5)) if weights.get("high_low", 0): counts = np.bincount(hist["high_count"].to_numpy(dtype=int), minlength=5) add_feature("high_low", logprob_from_counts(counts, UNIVERSE.high, n_hist, 5)) # 8) Hour/slot-specific positional frequency if weights.get("hour", 0): h_hour = hist[hist["hour"] == target_hour] if len(h_hour) >= 10: values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = np.bincount(h_hour[col].to_numpy(dtype=int), minlength=10) values += logprob_from_counts(counts, digits[:, j], len(h_hour), 10) add_feature("hour", values) # 9) Learned pattern distribution by unique digit count if weights.get("pattern", 0): # unique_count: 1=quad, 2=triple/two-pair, 3=one double, 4=all unique counts = np.bincount(hist["unique_count"].to_numpy(dtype=int), minlength=5) values = logprob_from_counts(counts, UNIVERSE.unique_counts, n_hist, 5) # tambahan penalti ringan untuk quad agar tidak terlalu sering naik karena sparse values = values.copy() values[UNIVERSE.unique_counts == 1] -= 0.75 add_feature("pattern", values) # 10) Recent repeat penalty if weights.get("repeat_penalty", 0): lookback = int(weights.get("repeat_lookback", 80)) lookback = max(1, min(lookback, n_hist)) recent_numbers = hist["raw_number"].tail(lookback).astype(int).to_numpy() seen_mask = np.zeros(10000, dtype=np.float64) seen_mask[recent_numbers] = 1.0 # feature negatif; weight positif => angka yang baru muncul mendapat penalti add_feature("repeat_penalty", -seen_mask) return score # ----------------------------- # 6) TIMESTAMP INFERENCE # ----------------------------- def infer_next_timestamp(data: pd.DataFrame, slot_order: List[str]) -> Tuple[pd.Timestamp, str]: last_ts = data["ts"].iloc[-1] last_hour = last_ts.strftime("%H:%M") if last_hour in slot_order: idx = slot_order.index(last_hour) next_hour = slot_order[(idx + 1) % len(slot_order)] next_date = last_ts.date() if idx == len(slot_order) - 1: next_date = (last_ts + pd.Timedelta(days=1)).date() return pd.Timestamp(f"{next_date} {next_hour}:00"), next_hour unique_ts = data["ts"].drop_duplicates().sort_values() gap = unique_ts.diff().dropna().median() if pd.isna(gap): gap = pd.Timedelta(hours=6) next_ts = last_ts + gap return next_ts, next_ts.strftime("%H:%M") # ----------------------------- # 7) BACKTEST WALK-FORWARD # ----------------------------- def evaluate_weights( data: pd.DataFrame, weights: Dict[str, float], last_n: int = 300, min_train: int = 120, top_checks: Tuple[int, ...] = (10, 20, 50, 100), ) -> Dict[str, float]: start = max(min_train, len(data) - last_n) if start >= len(data): raise ValueError("Data terlalu sedikit untuk backtest. Tambahkan data atau turunkan min_train/last_n.") ranks: List[int] = [] for i in range(start, len(data)): hist = data.iloc[:i] target = data.iloc[i] prev = data.iloc[i - 1] scores = score_candidates(hist, prev, target["hour"], weights) actual_idx = int(target["raw_number"]) # rank 1 = skor tertinggi rank = int(1 + np.sum(scores > scores[actual_idx])) ranks.append(rank) ranks_arr = np.array(ranks, dtype=np.float64) result: Dict[str, float] = { "n_test": float(len(ranks_arr)), "median_rank": float(np.median(ranks_arr)), "mean_rank": float(np.mean(ranks_arr)), "mrr": float(np.mean(1.0 / ranks_arr)), } for k in top_checks: result[f"top{k}"] = float(np.mean(ranks_arr <= k)) return result def choose_best_model(results_df: pd.DataFrame) -> str: sort_cols = ["top10", "top20", "top50", "mrr", "median_rank"] ascending = [False, False, False, False, True] available_cols = [c for c in sort_cols if c in results_df.columns] asc = [ascending[sort_cols.index(c)] for c in available_cols] return str(results_df.sort_values(available_cols, ascending=asc).iloc[0]["model"]) # ----------------------------- # 8) PREDICTION EXECUTION # ----------------------------- def generate_predictions( data: pd.DataFrame, weights: Dict[str, float], steps: int, top_k: int, slot_order: List[str], ) -> Tuple[List[str], pd.DataFrame, pd.Timestamp, str]: next_ts, next_hour = infer_next_timestamp(data, slot_order) hist = data.copy() prev = hist.iloc[-1] main_predictions: List[str] = [] all_rows: List[Dict[str, object]] = [] for step in range(1, steps + 1): scores = score_candidates(hist, prev, next_hour, weights) # Jangan duplikat prediksi utama antar-step simulasi for p in main_predictions: scores[int(p)] = -1e18 idxs = top_k_indices(scores, top_k) for rank, idx in enumerate(idxs, 1): all_rows.append( { "target_timestamp": str(next_ts), "target_hour": next_hour, "step": step, "rank": rank, "number": UNIVERSE.nums[idx], "score": round(float(scores[idx]), 6), "sum_digits": int(UNIVERSE.sums[idx]), "odd_count": int(UNIVERSE.odd[idx]), "high_count": int(UNIVERSE.high[idx]), "unique_count": int(UNIVERSE.unique_counts[idx]), } ) winner = str(UNIVERSE.nums[idxs[0]]) main_predictions.append(winner) # Simulasi autoregressive untuk step berikutnya. # Ini bukan fakta historis; hanya skenario model. new_row = {col: np.nan for col in hist.columns} new_row.update( { "timestamp": str(next_ts), "raw_number": winner, "ts": next_ts, "hour": next_hour, "_orig_order": len(hist), } ) for j, c in enumerate(winner): new_row[f"d{j + 1}"] = int(c) winner_digits = [int(c) for c in winner] new_row["sum_digits"] = int(sum(winner_digits)) new_row["odd_count"] = int(sum(d % 2 for d in winner_digits)) new_row["high_count"] = int(sum(d >= 5 for d in winner_digits)) new_row["unique_count"] = int(len(set(winner))) new_row["is_double"] = int(len(set(winner)) < 4) new_row["is_triple_or_quad"] = int(len(set(winner)) <= 2) new_row["is_quad"] = int(len(set(winner)) == 1) hist = pd.concat([hist, pd.DataFrame([new_row])], ignore_index=True) prev = hist.iloc[-1] return main_predictions, pd.DataFrame(all_rows), next_ts, next_hour # ----------------------------- # 9) CLI / MAIN # ----------------------------- def parse_args(argv: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Prediksi 4D Advanced Pro Upgrade") parser.add_argument("--csv", default="PRIZE_LEVEL_4_ELITE.csv", help="Path CSV input") parser.add_argument("--output", default=DEFAULT_OUTPUT, help="Path output prediksi CSV") parser.add_argument("--summary-output", default=DEFAULT_SUMMARY, help="Path output ringkasan backtest CSV") parser.add_argument("--backtest-last-n", type=int, default=300, help="Jumlah baris terakhir untuk backtest") parser.add_argument("--min-train", type=int, default=120, help="Minimum data training sebelum evaluasi") parser.add_argument("--steps", type=int, default=3, help="Jumlah prediksi utama bertahap") parser.add_argument("--top-k", type=int, default=20, help="Jumlah kandidat per step") parser.add_argument( "--slot-order", default=",".join(DEFAULT_SLOT_ORDER), help="Urutan jam draw, pisahkan koma. Contoh: 03:30,06:30,11:30,13:30,17:30,21:30", ) return parser.parse_args(argv) def main(argv: List[str] = None) -> int: args = parse_args(sys.argv[1:] if argv is None else argv) csv_path = resolve_csv_path(args.csv) print(f"Membaca file: {csv_path}") raw_df = pd.read_csv(csv_path, dtype={"raw_number": str}) df = prepare_data(raw_df) if len(df) < args.min_train + 5: raise ValueError(f"Data terlalu sedikit: {len(df)} baris. Minimal disarankan > {args.min_train + 5}.") slot_order = [x.strip() for x in str(args.slot_order).split(",") if x.strip()] if not slot_order: slot_order = DEFAULT_SLOT_ORDER print("\n===== INFO DATA =====") print(f"Total baris valid : {len(df)}") print(f"Periode data : {df['ts'].min()} s/d {df['ts'].max()}") print(f"Slot terdeteksi : {', '.join(sorted(df['hour'].unique()))}") print("\n===== BACKTEST WALK-FORWARD TANPA LEAKAGE =====") results = [] for name, weights in WEIGHT_SETS.items(): res = evaluate_weights(df, weights, last_n=args.backtest_last_n, min_train=args.min_train) row = {"model": name, **res} results.append(row) print( f"{name:16s} | " f"Top10={res['top10']:.2%} | Top20={res['top20']:.2%} | " f"Top50={res['top50']:.2%} | MedRank={res['median_rank']:.0f} | MRR={res['mrr']:.6f}" ) results_df = pd.DataFrame(results) best_model = choose_best_model(results_df) best_weights = WEIGHT_SETS[best_model] results_df.to_csv(args.summary_output, index=False) print(f"\n>>> Model terpilih: {best_model}") print(f"Ringkasan backtest disimpan: {args.summary_output}") main_preds, full_output, next_ts, next_hour = generate_predictions( df, best_weights, steps=args.steps, top_k=args.top_k, slot_order=slot_order ) print("\n===== TARGET =====") print(f"Target Timestamp: {next_ts} ({next_hour})") print("\n===== HASIL PREDIKSI UTAMA =====") for i, number in enumerate(main_preds, 1): print(f"Urutan {i}: {number}") print(f"\n===== TOP {args.top_k} KANDIDAT PER STEP =====") print(full_output.to_string(index=False)) full_output.to_csv(args.output, index=False) print(f"\nOutput prediksi disimpan: {args.output}") # Auto-download kalau berjalan di Colab try: from google.colab import files # type: ignore files.download(args.output) files.download(args.summary_output) except Exception: pass return 0 if __name__ == "__main__": raise SystemExit(main()) ''' Path('/mnt/data/prediksi_4d_pro_upgrade.py').write_text(script, encoding='utf-8') print(Path('/mnt/data/prediksi_4d_pro_upgrade.py').stat().st_size) print(Path('/mnt/data/prediksi_4d_pro_upgrade.py').read_text(encoding='utf-8')[:500])
pythonimport subprocess, sys, os, textwrap, pathlib, json, time cmd=[sys.executable, '/mnt/data/prediksi_4d_pro_upgrade.py', '--csv', '/mnt/data/PRIZE_LEVEL_4_ELITE.csv', '--output', '/mnt/data/prediksi_pro_4d_upgrade.csv', '--summary-output', '/mnt/data/ringkasan_backtest_4d_upgrade.csv', '--backtest-last-n', '300', '--top-k','20'] start=time.time() proc=subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=300) print('return', proc.returncode, 'time', time.time()-start) print(proc.stdout[-5000:]) print('ERR', proc.stderr[-2000:])
pythonimport pandas as pd, os summary=pd.read_csv('/mnt/data/ringkasan_backtest_4d_upgrade.csv') pred=pd.read_csv('/mnt/data/prediksi_pro_4d_upgrade.csv', dtype={'number':str}) print(summary) print(pred.head(10)) print(pred[pred['step']==1].head(5)) print(pred[pred['step']==2].head(5)) print(pred[pred['step']==3].head(5)) print(os.path.exists('/mnt/data/prediksi_4d_pro_upgrade.py'), os.path.exists('/mnt/data/prediksi_pro_4d_upgrade.csv'), os.path.exists('/mnt/data/ringkasan_backtest_4d_upgrade.csv'))
pythonpd.set_option('display.max_columns', None) print(summary.to_string(index=False)) print(pred.groupby('step').head(3).to_string(index=False))
Sudah saya upgrade dan test memakai file CSV yang Anda upload.
File hasil upgrade:
Download script upgrade .py
Download output prediksi CSV
Download ringkasan backtest CSV
Perubahan utama yang saya masukkan:
Backtest tanpa data leakage
Transition matrix sekarang dihitung dari histori saat itu saja, bukan dari full dataset masa depan.
target_hour dan prev_row sekarang benar-benar dipakai
Versi lama menerima parameter itu, tapi efeknya hampir tidak ada.
Scoring lebih stabil
Semua fitur utama dinormalisasi dengan robust z-score supaya bobot tidak berat sebelah karena beda skala.
Fitur tambahan
Output lebih lengkap
CSV prediksi sekarang berisi target_timestamp, step, rank, number, score, sum_digits, odd_count, high_count, dan unique_count.
Hasil test pada data Anda:
| Model | Top10 | Top20 | Top50 | Median Rank |
|---|---|---|---|---|
| balanced_v2 | 0.33% | 0.33% | 1.00% | 4068 |
| gap_hunter_v2 | 0.67% | 1.33% | 1.67% | 4364 |
| transition_v2 | 0.33% | 0.67% | 0.67% | 4164 |
| hour_pattern_v2 | 0.00% | 0.33% | 0.67% | 3875 |
Model terpilih oleh script: gap_hunter_v2.
Output utama dari run test:
| Urutan | Angka |
|---|---|
| 1 | 4851 |
| 2 | 1609 |
| 3 | 7540 |
Catatan penting: ini tetap analisis pola historis, bukan jaminan hasil acak/undian. Jalankan di Colab atau lokal seperti ini:
bashpython prediksi_4d_pro_upgrade.py --csv PRIZE_LEVEL_4_ELITE.csv