# ===============================================...

تم الإنشاء في: ٨ يونيو ٢٠٢٦

تمت الإجابة باستخدام GPT-5.5 Thinking بواسطة Chat01

سؤال

PRIZE_LEVEL_4_ELITE.csv

============================================================

SCRIPT PREDIKSI 4D - ADVANCED VERSION (PRO) - FIXED VERSION

Perbaikan: Fixed Indexing Error & Optimized Pattern Calculation

============================================================

import os
import numpy as np
import pandas as pd

-----------------------------

1) LOAD CSV

-----------------------------

try:
from google.colab import files
print('Silakan upload file CSV Anda...')
uploaded = files.upload()
CSV_PATH = next(iter(uploaded.keys()))
except Exception:
CSV_PATH = 'PRIZE_LEVEL_4_ELITE.csv'

print('Membaca file:', CSV_PATH)
df = pd.read_csv(CSV_PATH, dtype={'raw_number': str})

-----------------------------

2) CLEANING & FEATURE ENGINEERING

-----------------------------

def prepare_data(data):
data = data.copy()
if 'timestamp' not in data.columns or 'raw_number' not in data.columns:
raise ValueError("CSV wajib punya kolom 'timestamp' dan 'raw_number'.")

text
data['_orig_order'] = np.arange(len(data)) data['raw_number'] = data['raw_number'].astype(str).str.replace(r'\D', '', regex=True).str.zfill(4).str[-4:] data['ts'] = pd.to_datetime(data['timestamp']) data = data.sort_values(['ts', '_orig_order'], kind='mergesort').reset_index(drop=True) data['hour'] = data['ts'].dt.strftime('%H:%M') for j in range(4): data[f'd{j+1}'] = data['raw_number'].str[j].astype(int) digit_cols = ['d1', 'd2', 'd3', 'd4'] data['sum_digits'] = data[digit_cols].sum(axis=1) data['odd_count'] = (data[digit_cols] % 2).sum(axis=1) data['high_count'] = (data[digit_cols] >= 5).sum(axis=1) unique_counts = data['raw_number'].apply(lambda s: len(set(s))) data['is_double'] = (unique_counts < 4).astype(int) data['is_triple'] = (unique_counts <= 2).astype(int) data['is_quad'] = (unique_counts == 1).astype(int) return data

df = prepare_data(df)

-----------------------------

3) PRE-COMPUTING MATRICES (OPTIMIZED)

-----------------------------

NUMS = np.array([f'{i:04d}' for i in range(10000)])
DIGITS = np.array([[int(c) for c in s] for s in NUMS], dtype=np.int16)
SUMS = DIGITS.sum(axis=1)
ODD = (DIGITS % 2).sum(axis=1)
HIGH = (DIGITS >= 5).sum(axis=1)

Precompute Unique Counts for 10,000 numbers to speed up scoring

1 = 4 digits unique, 2 = double, 3 = triple/quad

UNIQUE_COUNTS_ARRAY = np.array([len(np.unique(row)) for row in DIGITS])

Transition Matrix (Correlation)

TRANSITION_PROBS = []
for j in range(1, 4):
matrix = np.zeros((10, 10))
for i in range(len(df) - 1):
prev_digit = df.iloc[i][f'd{j}']
curr_digit = df.iloc[i+1][f'd{j+1}']
matrix[int(prev_digit)][int(curr_digit)] += 1
matrix += 0.1
matrix /= matrix.sum(axis=1, keepdims=True)
TRANSITION_PROBS.append(matrix)

def get_gap_matrix(hist):
gap_matrix = np.zeros((4, 10))
for j in range(4):
col = f'd{j+1}'
for digit in range(10):
indices = hist.index[hist[col] == digit].tolist()
if not indices:
gap_matrix[j, digit] = len(hist)
else:
gap_matrix[j, digit] = len(hist) - indices[-1]
return gap_matrix

-----------------------------

4) CORE SCORING ENGINE (FIXED)

-----------------------------

def score_candidates(hist, prev_row, target_hour, weights):
logscore = np.zeros(len(NUMS), dtype=np.float64)
pos_cols = ['d1', 'd2', 'd3', 'd4']
recent_n = int(weights.get('recent_n', 300))

text
# 1. Frequency Score (Fixed Indexing) if weights.get('freq', 0): for j, col in enumerate(pos_cols): # Global counts_g = np.bincount(hist[col].to_numpy(), minlength=10) # Mapping counts_g (10,) to DIGITS (10000,) using indexing logscore += weights['freq'] * np.log((counts_g[DIGITS[:, j]] + 1) / (len(hist) + 10)) # Recent h_recent = hist.tail(recent_n) counts_r = np.bincount(h_recent[col].to_numpy(), minlength=10) logscore += weights['freq'] * np.log((counts_r[DIGITS[:, j]] + 1) / (recent_n + 10)) # 2. Gap Analysis Score if weights.get('gap', 0): gaps = get_gap_matrix(hist) for j in range(4): digit_gaps = gaps[j, DIGITS[:, j]] logscore += weights['gap'] * np.log1p(digit_gaps) # 3. Correlation Score if weights.get('corr', 0): for j in range(3): d_prev = DIGITS[:, j] d_curr = DIGITS[:, j+1] trans_p = TRANSITION_PROBS[j][d_prev, d_curr] logscore += weights['corr'] * np.log(trans_p + 1e-9) # 4. Sum & Odd/Even if weights.get('sum', 0): counts = np.bincount(hist['sum_digits'].to_numpy(), minlength=37) logscore += weights['sum'] * np.log((counts[SUMS] + 1) / (len(hist) + 37)) if weights.get('odd_even', 0): counts = np.bincount(hist['odd_count'].to_numpy(), minlength=5) logscore += weights['odd_even'] * np.log((counts[ODD] + 1) / (len(hist) + 5)) # 5. Pattern Score (Optimized) if weights.get('pattern', 0): # Menggunakan precomputed array logscore += weights['pattern'] * np.log(np.where(UNIQUE_COUNTS_ARRAY <= 2, 0.1, 0.5)) return logscore

-----------------------------

5) CONFIGURATIONS & BACKTEST

-----------------------------

WEIGHT_SETS = {
'balanced_pro': {
'freq': 0.8, 'gap': 1.2, 'corr': 1.5, 'sum': 0.5, 'odd_even': 0.3, 'pattern': 0.4, 'recent_n': 300
},
'gap_hunter': {
'freq': 0.5, 'gap': 2.5, 'corr': 0.5, 'sum': 0.2, 'odd_even': 0.2, 'pattern': 0.2, 'recent_n': 200
},
'pattern_master': {
'freq': 0.6, 'gap': 0.5, 'corr': 2.0, 'sum': 0.8, 'odd_even': 0.5, 'pattern': 1.5, 'recent_n': 400
}
}

def infer_next_timestamp(data):
slot_order = ['03:30', '06:30', '11:30', '13:30', '17:30', '21:30']
last_ts = data['ts'].iloc[-1]
last_hour = last_ts.strftime('%H:%M')
if last_hour in slot_order:
idx = slot_order.index(last_hour)
next_hour = slot_order[(idx + 1) % len(slot_order)]
next_date = last_ts.date()
if idx == len(slot_order) - 1:
next_date = (last_ts + pd.Timedelta(days=1)).date()
return pd.Timestamp(str(next_date) + ' ' + next_hour + ':00'), next_hour

text
unique_ts = data['ts'].drop_duplicates().sort_values() gap = unique_ts.diff().dropna().median() next_ts = last_ts + gap return next_ts, next_ts.strftime('%H:%M')

def evaluate_weights(data, weights, last_n=300):
start = max(100, len(data) - last_n)
hits_top10 = 0
ranks = []
for i in range(start, len(data)):
hist = data.iloc[:i]
target = data.iloc[i]
prev = data.iloc[i-1]
scores = score_candidates(hist, prev, target['hour'], weights)
actual = int(target['raw_number'])
rank = int(1 + np.sum(scores > scores[actual]))
ranks.append(rank)
if rank <= 10: hits_top10 += 1

text
n = len(data) - start return {'top10': hits_top10/n, 'med_rank': np.median(ranks)}

print('\n===== BACKTESTING MODEL PRO =====')
results = []
for name, w in WEIGHT_SETS.items():
res = evaluate_weights(df, w)
results.append({'model': name, **res})
print(f"Model {name}: Top10 Accuracy: {res['top10']:.2%}, Median Rank: {res['med_rank']}")

best_model_name = pd.DataFrame(results).sort_values('top10', ascending=False).iloc[0]['model']
best_weights = WEIGHT_SETS[best_model_name]
print(f'\n>>> Model Terpilih: {best_model_name}')

-----------------------------

6) PREDICTION EXECUTION

-----------------------------

next_ts, next_hour = infer_next_timestamp(df)
print(f'\nTarget Timestamp: {next_ts} ({next_hour})')

hist = df.copy()
prev = hist.iloc[-1]
main_predictions = []
all_results = []

for step in range(1, 4):
scores = score_candidates(hist, prev, next_hour, best_weights)

text
for p in main_predictions: scores[int(p)] = -1e18 top_indices = np.argsort(-scores)[:20] step_rows = [] for rank, idx in enumerate(top_indices, 1): step_rows.append({ 'step': step, 'rank': rank, 'number': NUMS[idx], 'score': round(scores[idx], 4) }) current_step_df = pd.DataFrame(step_rows) all_results.append(current_step_df) winner = NUMS[top_indices[0]] main_predictions.append(winner) # Simulation for next step new_row = {col: np.nan for col in hist.columns} new_row.update({'timestamp': str(next_ts), 'raw_number': winner, 'ts': next_ts, 'hour': next_hour}) for j, c in enumerate(winner): new_row[f'd{j+1}'] = int(c) new_row['sum_digits'] = sum(int(c) for c in winner) new_row['odd_count'] = sum(int(c)%2 for c in winner) new_row['high_count'] = sum(1 for c in winner if int(c)>=5) new_row['is_double'] = 1 if len(set(winner)) < 4 else 0 new_row['is_triple'] = 1 if len(set(winner)) <= 2 else 0 new_row['is_quad'] = 1 if len(set(winner)) == 1 else 0 hist = pd.concat([hist, pd.DataFrame([new_row])], ignore_index=True) prev = hist.iloc[-1]

print('\n===== HASIL PREDIKSI UTAMA =====')
print(f"Urutan 1: {main_predictions[0]}")
print(f"Urutan 2: {main_predictions[1]}")
print(f"Urutan 3: {main_predictions[2]}")

print('\n===== DAFTAR KANDIDAT TOP 20 =====')
full_output = pd.concat(all_results)
print(full_output.to_string(index=False))

full_output.to_csv('prediksi_pro_4d_fixed.csv', index=False)
try:
from google.colab import files
files.download('prediksi_pro_4d_fixed.csv')
except:
pass upgrade

Python

python
import pandas as pd, os, numpy as np, textwrap, json, sys, pathlib, statistics path='/mnt/data/PRIZE_LEVEL_4_ELITE.csv' print(os.path.exists(path), os.path.getsize(path) if os.path.exists(path) else None) df=pd.read_csv(path, dtype={'raw_number': str}) print(df.shape) print(df.head()) print(df.dtypes) print(df.columns.tolist()) print(df.tail())

Python

python
from pathlib import Path script = r'''# ============================================================ # PREDIKSI 4D - ADVANCED PRO UPGRADE # Fokus upgrade: # - Backtest walk-forward tanpa data leakage # - Transition matrix dihitung dari histori saat itu, bukan full data masa depan # - target_hour dan prev_row benar-benar dipakai # - Scoring dinormalisasi agar bobot lebih stabil # - Output CSV + ringkasan model # # Catatan penting: # Script ini hanya analisis pola historis/eksperimen statistik. # Tidak ada metode yang bisa menjamin hasil undian/angka acak. # ============================================================ import argparse import os import sys import warnings from dataclasses import dataclass from typing import Dict, List, Tuple import numpy as np import pandas as pd warnings.filterwarnings("ignore", category=RuntimeWarning) # ----------------------------- # 0) CONFIG # ----------------------------- DEFAULT_SLOT_ORDER = ["03:30", "06:30", "11:30", "13:30", "17:30", "21:30"] DEFAULT_OUTPUT = "prediksi_pro_4d_upgrade.csv" DEFAULT_SUMMARY = "ringkasan_backtest_4d_upgrade.csv" EPS = 1e-12 WEIGHT_SETS: Dict[str, Dict[str, float]] = { "balanced_v2": { "freq_global": 0.75, "freq_recent": 0.85, "freq_decay": 0.75, "gap": 0.85, "intra_corr": 0.90, "seq_corr": 0.95, "sum": 0.45, "odd_even": 0.35, "high_low": 0.30, "hour": 0.45, "pattern": 0.35, "repeat_penalty": 0.60, "recent_n": 300, "repeat_lookback": 80, "decay_halflife": 220, }, "gap_hunter_v2": { "freq_global": 0.35, "freq_recent": 0.45, "freq_decay": 0.35, "gap": 1.65, "intra_corr": 0.45, "seq_corr": 0.45, "sum": 0.20, "odd_even": 0.20, "high_low": 0.15, "hour": 0.25, "pattern": 0.20, "repeat_penalty": 0.80, "recent_n": 220, "repeat_lookback": 120, "decay_halflife": 180, }, "transition_v2": { "freq_global": 0.45, "freq_recent": 0.55, "freq_decay": 0.40, "gap": 0.35, "intra_corr": 1.35, "seq_corr": 1.50, "sum": 0.35, "odd_even": 0.35, "high_low": 0.25, "hour": 0.35, "pattern": 0.30, "repeat_penalty": 0.55, "recent_n": 360, "repeat_lookback": 80, "decay_halflife": 260, }, "hour_pattern_v2": { "freq_global": 0.45, "freq_recent": 0.65, "freq_decay": 0.65, "gap": 0.45, "intra_corr": 0.75, "seq_corr": 0.70, "sum": 0.55, "odd_even": 0.50, "high_low": 0.40, "hour": 1.20, "pattern": 0.65, "repeat_penalty": 0.55, "recent_n": 420, "repeat_lookback": 90, "decay_halflife": 300, }, } @dataclass(frozen=True) class Universe: nums: np.ndarray digits: np.ndarray sums: np.ndarray odd: np.ndarray high: np.ndarray unique_counts: np.ndarray def make_universe() -> Universe: nums = np.array([f"{i:04d}" for i in range(10000)]) digits = np.array([[int(c) for c in s] for s in nums], dtype=np.int16) sums = digits.sum(axis=1) odd = (digits % 2).sum(axis=1) high = (digits >= 5).sum(axis=1) unique_counts = np.array([len(set(s)) for s in nums], dtype=np.int8) return Universe(nums=nums, digits=digits, sums=sums, odd=odd, high=high, unique_counts=unique_counts) UNIVERSE = make_universe() POS_COLS = ["d1", "d2", "d3", "d4"] # ----------------------------- # 1) LOAD CSV # ----------------------------- def resolve_csv_path(default_path: str) -> str: """Colab-friendly loader with CLI fallback.""" if os.path.exists(default_path): return default_path # Fallback khusus Google Colab try: from google.colab import files # type: ignore print("File tidak ditemukan di path default.") print("Silakan upload file CSV Anda...") uploaded = files.upload() return next(iter(uploaded.keys())) except Exception: return default_path # ----------------------------- # 2) CLEANING & FEATURE ENGINEERING # ----------------------------- def prepare_data(data: pd.DataFrame) -> pd.DataFrame: data = data.copy() required = {"timestamp", "raw_number"} missing = required - set(data.columns) if missing: raise ValueError(f"CSV wajib punya kolom: {sorted(required)}. Kolom hilang: {sorted(missing)}") data["_orig_order"] = np.arange(len(data), dtype=np.int64) data["raw_number"] = ( data["raw_number"] .astype(str) .str.replace(r"\D", "", regex=True) .str.zfill(4) .str[-4:] ) data["ts"] = pd.to_datetime(data["timestamp"], errors="coerce") bad_ts = int(data["ts"].isna().sum()) if bad_ts: print(f"Warning: {bad_ts} baris timestamp invalid dibuang.") data = data.dropna(subset=["ts"]).copy() data = data.sort_values(["ts", "_orig_order"], kind="mergesort").reset_index(drop=True) data["hour"] = data["ts"].dt.strftime("%H:%M") for j in range(4): data[f"d{j + 1}"] = data["raw_number"].str[j].astype(int) digit_cols = ["d1", "d2", "d3", "d4"] data["sum_digits"] = data[digit_cols].sum(axis=1) data["odd_count"] = (data[digit_cols] % 2).sum(axis=1) data["high_count"] = (data[digit_cols] >= 5).sum(axis=1) unique_counts = data["raw_number"].apply(lambda s: len(set(s))) data["unique_count"] = unique_counts.astype(int) data["is_double"] = (unique_counts < 4).astype(int) data["is_triple_or_quad"] = (unique_counts <= 2).astype(int) data["is_quad"] = (unique_counts == 1).astype(int) return data # ----------------------------- # 3) NUMERIC HELPERS # ----------------------------- def robust_z(values: np.ndarray) -> np.ndarray: """Robust z-score agar skala fitur tidak mendominasi karena beda satuan.""" values = np.asarray(values, dtype=np.float64) med = np.nanmedian(values) mad = np.nanmedian(np.abs(values - med)) if not np.isfinite(mad) or mad < EPS: std = np.nanstd(values) if not np.isfinite(std) or std < EPS: return np.zeros_like(values, dtype=np.float64) return (values - np.nanmean(values)) / (std + EPS) return (values - med) / (1.4826 * mad + EPS) def logprob_from_counts(counts: np.ndarray, idx: np.ndarray, denom: float, alpha_bins: int) -> np.ndarray: return np.log((counts[idx] + 1.0) / (denom + alpha_bins)) def weighted_bincount(values: np.ndarray, weights: np.ndarray, minlength: int) -> np.ndarray: return np.bincount(values.astype(int), weights=weights, minlength=minlength).astype(np.float64) def top_k_indices(scores: np.ndarray, k: int) -> np.ndarray: k = int(min(k, len(scores))) idx = np.argpartition(-scores, k - 1)[:k] return idx[np.argsort(-scores[idx])] # ----------------------------- # 4) MATRIX BUILDERS - NO LEAKAGE # ----------------------------- def get_gap_matrix_fast(hist: pd.DataFrame) -> np.ndarray: gap_matrix = np.zeros((4, 10), dtype=np.float64) n = len(hist) for j, col in enumerate(POS_COLS): arr = hist[col].to_numpy(dtype=np.int16) last_seen = np.full(10, -1, dtype=np.int64) for pos, digit in enumerate(arr): last_seen[int(digit)] = pos gaps = np.where(last_seen >= 0, n - 1 - last_seen, n) gap_matrix[j] = gaps return gap_matrix def build_intra_transition_probs(hist: pd.DataFrame, alpha: float = 0.20) -> List[np.ndarray]: """P(digit posisi j+1 | digit posisi j), dihitung hanya dari histori.""" matrices: List[np.ndarray] = [] for j in range(3): src = hist[f"d{j + 1}"].to_numpy(dtype=np.int16) dst = hist[f"d{j + 2}"].to_numpy(dtype=np.int16) matrix = np.full((10, 10), alpha, dtype=np.float64) np.add.at(matrix, (src, dst), 1.0) matrix /= matrix.sum(axis=1, keepdims=True) matrices.append(matrix) return matrices def build_seq_transition_probs(hist: pd.DataFrame, alpha: float = 0.20) -> List[np.ndarray]: """P(digit saat ini di posisi j | digit draw/baris sebelumnya di posisi j).""" matrices: List[np.ndarray] = [] if len(hist) < 2: return [np.full((10, 10), 0.1, dtype=np.float64) for _ in range(4)] for j in range(4): src = hist[f"d{j + 1}"].iloc[:-1].to_numpy(dtype=np.int16) dst = hist[f"d{j + 1}"].iloc[1:].to_numpy(dtype=np.int16) matrix = np.full((10, 10), alpha, dtype=np.float64) np.add.at(matrix, (src, dst), 1.0) matrix /= matrix.sum(axis=1, keepdims=True) matrices.append(matrix) return matrices # ----------------------------- # 5) CORE SCORING ENGINE - UPGRADED # ----------------------------- def score_candidates( hist: pd.DataFrame, prev_row: pd.Series, target_hour: str, weights: Dict[str, float], normalize_features: bool = True, ) -> np.ndarray: digits = UNIVERSE.digits score = np.zeros(len(UNIVERSE.nums), dtype=np.float64) def add_feature(name: str, raw_values: np.ndarray) -> None: nonlocal score weight = float(weights.get(name, 0.0)) if weight == 0: return values = robust_z(raw_values) if normalize_features else raw_values score += weight * values n_hist = len(hist) if n_hist == 0: return score recent_n = int(weights.get("recent_n", 300)) recent_n = max(1, min(recent_n, n_hist)) h_recent = hist.tail(recent_n) # 1) Global positional frequency if weights.get("freq_global", 0): values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = np.bincount(hist[col].to_numpy(dtype=int), minlength=10) values += logprob_from_counts(counts, digits[:, j], n_hist, 10) add_feature("freq_global", values) # 2) Recent positional frequency if weights.get("freq_recent", 0): values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = np.bincount(h_recent[col].to_numpy(dtype=int), minlength=10) values += logprob_from_counts(counts, digits[:, j], len(h_recent), 10) add_feature("freq_recent", values) # 3) Exponential decay frequency if weights.get("freq_decay", 0): halflife = float(weights.get("decay_halflife", 220)) halflife = max(1.0, halflife) age = np.arange(n_hist - 1, -1, -1, dtype=np.float64) decay_w = np.power(0.5, age / halflife) denom = float(decay_w.sum()) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = weighted_bincount(hist[col].to_numpy(dtype=int), decay_w, minlength=10) values += logprob_from_counts(counts, digits[:, j], denom, 10) add_feature("freq_decay", values) # 4) Gap analysis if weights.get("gap", 0): gaps = get_gap_matrix_fast(hist) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j in range(4): # cap gap agar outlier lama tidak terlalu mendominasi g = gaps[j, digits[:, j]] values += np.log1p(np.minimum(g, np.nanpercentile(g, 95))) add_feature("gap", values) # 5) Intra-number transition: posisi d1->d2, d2->d3, d3->d4 if weights.get("intra_corr", 0): matrices = build_intra_transition_probs(hist) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j in range(3): values += np.log(matrices[j][digits[:, j], digits[:, j + 1]] + EPS) add_feature("intra_corr", values) # 6) Sequential transition: digit baris sebelumnya -> kandidat saat ini if weights.get("seq_corr", 0): matrices = build_seq_transition_probs(hist) values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j in range(4): prev_digit = int(prev_row[f"d{j + 1}"]) values += np.log(matrices[j][prev_digit, digits[:, j]] + EPS) add_feature("seq_corr", values) # 7) Sum, odd/even, high/low distributions if weights.get("sum", 0): counts = np.bincount(hist["sum_digits"].to_numpy(dtype=int), minlength=37) add_feature("sum", logprob_from_counts(counts, UNIVERSE.sums, n_hist, 37)) if weights.get("odd_even", 0): counts = np.bincount(hist["odd_count"].to_numpy(dtype=int), minlength=5) add_feature("odd_even", logprob_from_counts(counts, UNIVERSE.odd, n_hist, 5)) if weights.get("high_low", 0): counts = np.bincount(hist["high_count"].to_numpy(dtype=int), minlength=5) add_feature("high_low", logprob_from_counts(counts, UNIVERSE.high, n_hist, 5)) # 8) Hour/slot-specific positional frequency if weights.get("hour", 0): h_hour = hist[hist["hour"] == target_hour] if len(h_hour) >= 10: values = np.zeros(len(UNIVERSE.nums), dtype=np.float64) for j, col in enumerate(POS_COLS): counts = np.bincount(h_hour[col].to_numpy(dtype=int), minlength=10) values += logprob_from_counts(counts, digits[:, j], len(h_hour), 10) add_feature("hour", values) # 9) Learned pattern distribution by unique digit count if weights.get("pattern", 0): # unique_count: 1=quad, 2=triple/two-pair, 3=one double, 4=all unique counts = np.bincount(hist["unique_count"].to_numpy(dtype=int), minlength=5) values = logprob_from_counts(counts, UNIVERSE.unique_counts, n_hist, 5) # tambahan penalti ringan untuk quad agar tidak terlalu sering naik karena sparse values = values.copy() values[UNIVERSE.unique_counts == 1] -= 0.75 add_feature("pattern", values) # 10) Recent repeat penalty if weights.get("repeat_penalty", 0): lookback = int(weights.get("repeat_lookback", 80)) lookback = max(1, min(lookback, n_hist)) recent_numbers = hist["raw_number"].tail(lookback).astype(int).to_numpy() seen_mask = np.zeros(10000, dtype=np.float64) seen_mask[recent_numbers] = 1.0 # feature negatif; weight positif => angka yang baru muncul mendapat penalti add_feature("repeat_penalty", -seen_mask) return score # ----------------------------- # 6) TIMESTAMP INFERENCE # ----------------------------- def infer_next_timestamp(data: pd.DataFrame, slot_order: List[str]) -> Tuple[pd.Timestamp, str]: last_ts = data["ts"].iloc[-1] last_hour = last_ts.strftime("%H:%M") if last_hour in slot_order: idx = slot_order.index(last_hour) next_hour = slot_order[(idx + 1) % len(slot_order)] next_date = last_ts.date() if idx == len(slot_order) - 1: next_date = (last_ts + pd.Timedelta(days=1)).date() return pd.Timestamp(f"{next_date} {next_hour}:00"), next_hour unique_ts = data["ts"].drop_duplicates().sort_values() gap = unique_ts.diff().dropna().median() if pd.isna(gap): gap = pd.Timedelta(hours=6) next_ts = last_ts + gap return next_ts, next_ts.strftime("%H:%M") # ----------------------------- # 7) BACKTEST WALK-FORWARD # ----------------------------- def evaluate_weights( data: pd.DataFrame, weights: Dict[str, float], last_n: int = 300, min_train: int = 120, top_checks: Tuple[int, ...] = (10, 20, 50, 100), ) -> Dict[str, float]: start = max(min_train, len(data) - last_n) if start >= len(data): raise ValueError("Data terlalu sedikit untuk backtest. Tambahkan data atau turunkan min_train/last_n.") ranks: List[int] = [] for i in range(start, len(data)): hist = data.iloc[:i] target = data.iloc[i] prev = data.iloc[i - 1] scores = score_candidates(hist, prev, target["hour"], weights) actual_idx = int(target["raw_number"]) # rank 1 = skor tertinggi rank = int(1 + np.sum(scores > scores[actual_idx])) ranks.append(rank) ranks_arr = np.array(ranks, dtype=np.float64) result: Dict[str, float] = { "n_test": float(len(ranks_arr)), "median_rank": float(np.median(ranks_arr)), "mean_rank": float(np.mean(ranks_arr)), "mrr": float(np.mean(1.0 / ranks_arr)), } for k in top_checks: result[f"top{k}"] = float(np.mean(ranks_arr <= k)) return result def choose_best_model(results_df: pd.DataFrame) -> str: sort_cols = ["top10", "top20", "top50", "mrr", "median_rank"] ascending = [False, False, False, False, True] available_cols = [c for c in sort_cols if c in results_df.columns] asc = [ascending[sort_cols.index(c)] for c in available_cols] return str(results_df.sort_values(available_cols, ascending=asc).iloc[0]["model"]) # ----------------------------- # 8) PREDICTION EXECUTION # ----------------------------- def generate_predictions( data: pd.DataFrame, weights: Dict[str, float], steps: int, top_k: int, slot_order: List[str], ) -> Tuple[List[str], pd.DataFrame, pd.Timestamp, str]: next_ts, next_hour = infer_next_timestamp(data, slot_order) hist = data.copy() prev = hist.iloc[-1] main_predictions: List[str] = [] all_rows: List[Dict[str, object]] = [] for step in range(1, steps + 1): scores = score_candidates(hist, prev, next_hour, weights) # Jangan duplikat prediksi utama antar-step simulasi for p in main_predictions: scores[int(p)] = -1e18 idxs = top_k_indices(scores, top_k) for rank, idx in enumerate(idxs, 1): all_rows.append( { "target_timestamp": str(next_ts), "target_hour": next_hour, "step": step, "rank": rank, "number": UNIVERSE.nums[idx], "score": round(float(scores[idx]), 6), "sum_digits": int(UNIVERSE.sums[idx]), "odd_count": int(UNIVERSE.odd[idx]), "high_count": int(UNIVERSE.high[idx]), "unique_count": int(UNIVERSE.unique_counts[idx]), } ) winner = str(UNIVERSE.nums[idxs[0]]) main_predictions.append(winner) # Simulasi autoregressive untuk step berikutnya. # Ini bukan fakta historis; hanya skenario model. new_row = {col: np.nan for col in hist.columns} new_row.update( { "timestamp": str(next_ts), "raw_number": winner, "ts": next_ts, "hour": next_hour, "_orig_order": len(hist), } ) for j, c in enumerate(winner): new_row[f"d{j + 1}"] = int(c) winner_digits = [int(c) for c in winner] new_row["sum_digits"] = int(sum(winner_digits)) new_row["odd_count"] = int(sum(d % 2 for d in winner_digits)) new_row["high_count"] = int(sum(d >= 5 for d in winner_digits)) new_row["unique_count"] = int(len(set(winner))) new_row["is_double"] = int(len(set(winner)) < 4) new_row["is_triple_or_quad"] = int(len(set(winner)) <= 2) new_row["is_quad"] = int(len(set(winner)) == 1) hist = pd.concat([hist, pd.DataFrame([new_row])], ignore_index=True) prev = hist.iloc[-1] return main_predictions, pd.DataFrame(all_rows), next_ts, next_hour # ----------------------------- # 9) CLI / MAIN # ----------------------------- def parse_args(argv: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Prediksi 4D Advanced Pro Upgrade") parser.add_argument("--csv", default="PRIZE_LEVEL_4_ELITE.csv", help="Path CSV input") parser.add_argument("--output", default=DEFAULT_OUTPUT, help="Path output prediksi CSV") parser.add_argument("--summary-output", default=DEFAULT_SUMMARY, help="Path output ringkasan backtest CSV") parser.add_argument("--backtest-last-n", type=int, default=300, help="Jumlah baris terakhir untuk backtest") parser.add_argument("--min-train", type=int, default=120, help="Minimum data training sebelum evaluasi") parser.add_argument("--steps", type=int, default=3, help="Jumlah prediksi utama bertahap") parser.add_argument("--top-k", type=int, default=20, help="Jumlah kandidat per step") parser.add_argument( "--slot-order", default=",".join(DEFAULT_SLOT_ORDER), help="Urutan jam draw, pisahkan koma. Contoh: 03:30,06:30,11:30,13:30,17:30,21:30", ) return parser.parse_args(argv) def main(argv: List[str] = None) -> int: args = parse_args(sys.argv[1:] if argv is None else argv) csv_path = resolve_csv_path(args.csv) print(f"Membaca file: {csv_path}") raw_df = pd.read_csv(csv_path, dtype={"raw_number": str}) df = prepare_data(raw_df) if len(df) < args.min_train + 5: raise ValueError(f"Data terlalu sedikit: {len(df)} baris. Minimal disarankan > {args.min_train + 5}.") slot_order = [x.strip() for x in str(args.slot_order).split(",") if x.strip()] if not slot_order: slot_order = DEFAULT_SLOT_ORDER print("\n===== INFO DATA =====") print(f"Total baris valid : {len(df)}") print(f"Periode data : {df['ts'].min()} s/d {df['ts'].max()}") print(f"Slot terdeteksi : {', '.join(sorted(df['hour'].unique()))}") print("\n===== BACKTEST WALK-FORWARD TANPA LEAKAGE =====") results = [] for name, weights in WEIGHT_SETS.items(): res = evaluate_weights(df, weights, last_n=args.backtest_last_n, min_train=args.min_train) row = {"model": name, **res} results.append(row) print( f"{name:16s} | " f"Top10={res['top10']:.2%} | Top20={res['top20']:.2%} | " f"Top50={res['top50']:.2%} | MedRank={res['median_rank']:.0f} | MRR={res['mrr']:.6f}" ) results_df = pd.DataFrame(results) best_model = choose_best_model(results_df) best_weights = WEIGHT_SETS[best_model] results_df.to_csv(args.summary_output, index=False) print(f"\n>>> Model terpilih: {best_model}") print(f"Ringkasan backtest disimpan: {args.summary_output}") main_preds, full_output, next_ts, next_hour = generate_predictions( df, best_weights, steps=args.steps, top_k=args.top_k, slot_order=slot_order ) print("\n===== TARGET =====") print(f"Target Timestamp: {next_ts} ({next_hour})") print("\n===== HASIL PREDIKSI UTAMA =====") for i, number in enumerate(main_preds, 1): print(f"Urutan {i}: {number}") print(f"\n===== TOP {args.top_k} KANDIDAT PER STEP =====") print(full_output.to_string(index=False)) full_output.to_csv(args.output, index=False) print(f"\nOutput prediksi disimpan: {args.output}") # Auto-download kalau berjalan di Colab try: from google.colab import files # type: ignore files.download(args.output) files.download(args.summary_output) except Exception: pass return 0 if __name__ == "__main__": raise SystemExit(main()) ''' Path('/mnt/data/prediksi_4d_pro_upgrade.py').write_text(script, encoding='utf-8') print(Path('/mnt/data/prediksi_4d_pro_upgrade.py').stat().st_size) print(Path('/mnt/data/prediksi_4d_pro_upgrade.py').read_text(encoding='utf-8')[:500])

Python

python
import subprocess, sys, os, textwrap, pathlib, json, time cmd=[sys.executable, '/mnt/data/prediksi_4d_pro_upgrade.py', '--csv', '/mnt/data/PRIZE_LEVEL_4_ELITE.csv', '--output', '/mnt/data/prediksi_pro_4d_upgrade.csv', '--summary-output', '/mnt/data/ringkasan_backtest_4d_upgrade.csv', '--backtest-last-n', '300', '--top-k','20'] start=time.time() proc=subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=300) print('return', proc.returncode, 'time', time.time()-start) print(proc.stdout[-5000:]) print('ERR', proc.stderr[-2000:])

Python

python
import pandas as pd, os summary=pd.read_csv('/mnt/data/ringkasan_backtest_4d_upgrade.csv') pred=pd.read_csv('/mnt/data/prediksi_pro_4d_upgrade.csv', dtype={'number':str}) print(summary) print(pred.head(10)) print(pred[pred['step']==1].head(5)) print(pred[pred['step']==2].head(5)) print(pred[pred['step']==3].head(5)) print(os.path.exists('/mnt/data/prediksi_4d_pro_upgrade.py'), os.path.exists('/mnt/data/prediksi_pro_4d_upgrade.csv'), os.path.exists('/mnt/data/ringkasan_backtest_4d_upgrade.csv'))

Python

python
pd.set_option('display.max_columns', None) print(summary.to_string(index=False)) print(pred.groupby('step').head(3).to_string(index=False))

الفكرة

Disclaiming prediction accuracy and clarifying

It seems I should finish in Indonesian and mention file links. I'll state “Saya sudah upgrade dan sudah saya test pada CSV.” I shouldn't overstate: the top10 accuracy at 0.67% is better than random (0.10%), but that’s still small. I’ll also add, "bukan kepastian," explaining that predictions aren’t guarantees. I’ll avoid direct betting advice.

Thought for 2m 54s

الإجابة

Sudah saya upgrade dan test memakai file CSV yang Anda upload.

File hasil upgrade:

Download script upgrade .py
Download output prediksi CSV
Download ringkasan backtest CSV

Perubahan utama yang saya masukkan:

  1. Backtest tanpa data leakage
    Transition matrix sekarang dihitung dari histori saat itu saja, bukan dari full dataset masa depan.

  2. target_hour dan prev_row sekarang benar-benar dipakai
    Versi lama menerima parameter itu, tapi efeknya hampir tidak ada.

  3. Scoring lebih stabil
    Semua fitur utama dinormalisasi dengan robust z-score supaya bobot tidak berat sebelah karena beda skala.

  4. Fitur tambahan

    • global positional frequency
    • recent frequency
    • exponential decay frequency
    • gap analysis capped
    • intra-number transition
    • sequential transition dari row sebelumnya
    • hour/slot pattern
    • odd/even, high/low, sum pattern
    • repeat penalty angka yang baru muncul
  5. Output lebih lengkap
    CSV prediksi sekarang berisi target_timestamp, step, rank, number, score, sum_digits, odd_count, high_count, dan unique_count.

Hasil test pada data Anda:

ModelTop10Top20Top50Median Rank
balanced_v20.33%0.33%1.00%4068
gap_hunter_v20.67%1.33%1.67%4364
transition_v20.33%0.67%0.67%4164
hour_pattern_v20.00%0.33%0.67%3875

Model terpilih oleh script: gap_hunter_v2.

Output utama dari run test:

UrutanAngka
14851
21609
37540

Catatan penting: ini tetap analisis pola historis, bukan jaminan hasil acak/undian. Jalankan di Colab atau lokal seperti ini:

bash
python prediksi_4d_pro_upgrade.py --csv PRIZE_LEVEL_4_ELITE.csv

مشاركة هذا السؤال والجواب