# ================================================...

Created on: June 1, 2026

Answered using GPT-5.5 Thinking by Chat01

Question

PRIZE_clean_long.csv

============================================================

Script Colab: Prediksi hasil PRIZE 4D dari PRIZE_clean_long.csv

Algoritma: Weighted Markov + frekuensi digit terbaru + frekuensi kondisional waktu/slot

Catatan penting: hasil 4D/lottery biasanya acak. Script ini hanya mencari pola statistik

historis dan tidak bisa menjamin hasil berikutnya.

============================================================

import os
import numpy as np
import pandas as pd

-------------------------

1) Upload / set file CSV

-------------------------

Di Google Colab, jalankan cell ini lalu upload PRIZE_clean_long.csv.

try:
from google.colab import files # type: ignore
print("Upload file PRIZE_clean_long.csv...")
uploaded = files.upload()
CSV_PATH = next(iter(uploaded.keys()))
except Exception:
# Untuk local/Jupyter biasa: simpan CSV di folder yang sama dengan script ini.
candidates = ["PRIZE_clean_long.csv", "/content/PRIZE_clean_long.csv", "/mnt/data/PRIZE_clean_long.csv"]
CSV_PATH = next((p for p in candidates if os.path.exists(p)), candidates[0])

-------------------------

2) Parameter utama

-------------------------

RECENT_WINDOW = 144 # jumlah baris terakhir untuk membaca tren digit terbaru
ALPHA = 1.0 # smoothing agar digit yang jarang tetap punya peluang
TOP_K = 20 # jumlah kandidat yang ditampilkan
MIN_HISTORY = 60 # minimum histori sebelum evaluasi walk-forward

Bobot ensemble probabilitas. Total tidak wajib 1 karena akan dinormalisasi lagi.

WEIGHTS = {
"recent": 0.35, # tren digit terbaru
"transition": 0.35, # Markov: digit sekarang -> digit berikutnya
"conditional": 0.20, # pola berdasarkan waktu + slot saat ini
"global": 0.10 # distribusi umum seluruh histori
}

-------------------------

3) Load dan bersihkan data

-------------------------

def load_data(path: str) -> pd.DataFrame:
df = pd.read_csv(path, dtype={"result_4d": str})
required = {"datetime", "waktu", "slot_dalam_jam", "result_4d"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Kolom wajib tidak ditemukan: {sorted(missing)}")

text
df["datetime"] = pd.to_datetime(df["datetime"]) sort_cols = [c for c in ["datetime", "slot_dalam_jam", "source_line"] if c in df.columns] df = df.sort_values(sort_cols).reset_index(drop=True) df["result_4d"] = df["result_4d"].astype(str).str.zfill(4).str[-4:] for pos in range(4): df[f"p{pos+1}"] = df["result_4d"].str[pos].astype(int) return df

def digit_distribution(values, alpha: float = ALPHA) -> np.ndarray:
"""Distribusi probabilitas digit 0-9 dengan Laplace smoothing."""
counts = np.ones(10, dtype=float) * alpha
vals = pd.Series(values).dropna().astype(int)
if len(vals):
vc = vals.value_counts()
for digit, count in vc.items():
counts[int(digit)] += float(count)
return counts / counts.sum()

def make_training_pairs(observed: pd.DataFrame, mode: str) -> pd.DataFrame:
"""
Membuat pasangan data historis yang benar-benar sudah diketahui pada saat prediksi.
mode='global' : baris j -> baris j+1
mode='same_slot' : baris j -> kemunculan berikutnya untuk slot_dalam_jam yang sama
"""
if mode == "global":
pair = observed.copy()
for pos in range(1, 5):
pair[f"y{pos}"] = observed[f"p{pos}"].shift(-1)
return pair.dropna(subset=["y1", "y2", "y3", "y4"])

text
if mode == "same_slot": pair = observed.copy() for pos in range(1, 5): pair[f"y{pos}"] = pair.groupby("slot_dalam_jam")[f"p{pos}"].shift(-1) return pair.dropna(subset=["y1", "y2", "y3", "y4"]) raise ValueError("mode harus 'global' atau 'same_slot'")

def probabilities_for_row(df: pd.DataFrame, row_idx: int, mode: str) -> list[np.ndarray]:
"""Menghasilkan probabilitas digit 0-9 untuk posisi p1, p2, p3, p4."""
observed = df.iloc[:row_idx + 1].copy()
current = df.iloc[row_idx]
pairs = make_training_pairs(observed, mode)

text
probs = [] for pos in range(1, 5): # 1. Frekuensi terbaru if mode == "same_slot": recent_source = observed[observed["slot_dalam_jam"] == current["slot_dalam_jam"]] else: recent_source = observed p_recent = digit_distribution(recent_source[f"p{pos}"].tail(RECENT_WINDOW)) # 2. Distribusi umum p_global = digit_distribution(observed[f"p{pos}"]) # 3. Markov transition digit sekarang -> digit target berikutnya cur_digit = int(current[f"p{pos}"]) trans_source = pairs[pairs[f"p{pos}"] == cur_digit] if mode == "same_slot": trans_source = trans_source[trans_source["slot_dalam_jam"] == current["slot_dalam_jam"]] p_transition = digit_distribution(trans_source[f"y{pos}"], alpha=ALPHA * 0.7) # 4. Kondisional berdasarkan waktu + slot saat ini cond_source = pairs[ (pairs["waktu"] == current["waktu"]) & (pairs["slot_dalam_jam"] == current["slot_dalam_jam"]) ] p_conditional = digit_distribution(cond_source[f"y{pos}"], alpha=ALPHA * 0.7) combined = ( WEIGHTS["recent"] * p_recent + WEIGHTS["transition"] * p_transition + WEIGHTS["conditional"] * p_conditional + WEIGHTS["global"] * p_global ) combined = combined / combined.sum() probs.append(combined) return probs

def top_combinations(probs: list[np.ndarray], k: int = TOP_K) -> pd.DataFrame:
"""Enumerasi semua 0000-9999 lalu ambil kombinasi dengan skor probabilitas tertinggi."""
score_grid = (
probs[0][:, None, None, None] *
probs[1][None, :, None, None] *
probs[2][None, None, :, None] *
probs[3][None, None, None, :]
)
flat = score_grid.reshape(-1)
idx = np.argsort(flat)[::-1][:k]
rows = []
for rank, i in enumerate(idx, start=1):
num = f"{i//1000}{(i//100)%10}{(i//10)%10}{i%10}"
rows.append({"rank": rank, "prediksi_4d": num, "score": float(flat[i])})
return pd.DataFrame(rows)

def true_next_result(df: pd.DataFrame, row_idx: int, mode: str):
"""Ambil kebenaran target untuk evaluasi historis."""
if mode == "global":
if row_idx + 1 >= len(df):
return None
return df.loc[row_idx + 1, "result_4d"]

text
if mode == "same_slot": slot = df.loc[row_idx, "slot_dalam_jam"] future = df[(df.index > row_idx) & (df["slot_dalam_jam"] == slot)] if future.empty: return None return future.iloc[0]["result_4d"] raise ValueError("mode harus 'global' atau 'same_slot'")

def walk_forward_evaluation(df: pd.DataFrame, mode: str, top_list=(1, 5, 10, 20, 50, 100)) -> dict:
"""Evaluasi tanpa melihat masa depan: setiap prediksi hanya memakai data sampai baris itu."""
start = max(MIN_HISTORY, int(len(df) * 0.80))
hits = {k: 0 for k in top_list}
pos_hits = np.zeros(4, dtype=float)
n = 0

text
for row_idx in range(start, len(df)): truth = true_next_result(df, row_idx, mode) if truth is None: continue probs = probabilities_for_row(df, row_idx, mode) preds = top_combinations(probs, k=max(top_list))["prediksi_4d"].tolist() for k in top_list: hits[k] += int(truth in preds[:k]) top1 = preds[0] for pos in range(4): pos_hits[pos] += int(top1[pos] == truth[pos]) n += 1 if n == 0: return {"n": 0, "error": "Tidak ada baris evaluasi."} return { "mode": mode, "n_test": n, "exact_hit_rate": {f"top_{k}": hits[k] / n for k in top_list}, "top1_digit_accuracy_p1_p2_p3_p4": (pos_hits / n).round(4).tolist() }

def print_digit_probabilities(probs: list[np.ndarray]):
for pos, p in enumerate(probs, start=1):
ranked = sorted([(d, float(p[d])) for d in range(10)], key=lambda x: x[1], reverse=True)
print(f"Posisi {pos}:", [(d, round(prob, 4)) for d, prob in ranked])

-------------------------

4) Jalankan analisis

-------------------------

df = load_data(CSV_PATH)
print(f"Data terbaca: {len(df):,} baris")
print("Rentang waktu:", df["datetime"].min(), "sampai", df["datetime"].max())
print("Baris terakhir:")
print(df.tail(1)[["datetime", "waktu", "slot_dalam_jam", "result_4d"]])

Evaluasi model pada histori terakhir

for mode in ["global", "same_slot"]:
print("\n" + "="*70)
print(f"Evaluasi walk-forward mode: {mode}")
print(walk_forward_evaluation(df, mode))

A) Prediksi satu hasil global setelah baris terakhir

last_idx = len(df) - 1
print("\n" + "="*70)
print("Prediksi GLOBAL setelah baris terakhir")
probs_global = probabilities_for_row(df, last_idx, mode="global")
print_digit_probabilities(probs_global)
pred_global = top_combinations(probs_global, TOP_K)
print(pred_global)
pred_global.to_csv("prediksi_next_global.csv", index=False)

B) Prediksi next same-slot untuk setiap slot pada draw terakhir

Ini berguna kalau file berisi 3 prize/slot per waktu draw dan Anda ingin prediksi slot 1,2,3 berikutnya.

print("\n" + "="*70)
print("Prediksi SAME_SLOT untuk slot-slot pada datetime terakhir")
last_datetime = df["datetime"].max()
last_rows = df[df["datetime"] == last_datetime].copy()
all_same_slot = []
for idx, row in last_rows.iterrows():
print("\nSlot", row["slot_dalam_jam"], "| result terakhir", row["result_4d"], "| waktu", row["waktu"])
probs_slot = probabilities_for_row(df, int(idx), mode="same_slot")
print_digit_probabilities(probs_slot)
pred_slot = top_combinations(probs_slot, TOP_K)
pred_slot.insert(0, "slot_dalam_jam", row["slot_dalam_jam"])
pred_slot.insert(1, "last_result_4d", row["result_4d"])
print(pred_slot)
all_same_slot.append(pred_slot)

pred_same_slot = pd.concat(all_same_slot, ignore_index=True)
pred_same_slot.to_csv("prediksi_next_same_slot.csv", index=False)

print("\nFile output dibuat:")
print("- prediksi_next_global.csv")
print("- prediksi_next_same_slot.csv")
print("\nCatatan: Jika exact_hit_rate mendekati 0, artinya data tidak menunjukkan pola prediktif kuat. Gunakan hasil sebagai ranking statistik, bukan kepastian.") Buat script ini top 5 paling akurat dan BBFS 5D TERAKURAT

Python

python
import pandas as pd, os, numpy as np, json, textwrap, itertools, math, statistics path="/mnt/data/PRIZE_clean_long.csv" df=pd.read_csv(path, dtype={"result_4d":str}) df.head(), df.tail(), df.shape, df.columns.tolist()[:20] ( source_line draw_id ... next_same_slot_d4 next_same_slot_missing_digits_hitung 0 4 0 ... 6.0 0,1,2,3,4,5 1 5 0 ... 2.0 0,1,3,4,5,6,7,8 2 6 0 ... 5.0 0,2,3,4,7,9 3 9 1 ... 9.0 0,1,3,4,5,6,8 4 10 1 ... 3.0 0,1,2,5,6,7,8 [5 rows x 54 columns], source_line ... next_same_slot_missing_digits_hitung 1966 3280 ... 2,3,4,5,6,8 1967 3281 ... 2,3,4,5,6,8,9 1968 3284 ... NaN 1969 3285 ... NaN 1970 3286 ... NaN [5 rows x 54 columns], (1971, 54), ['source_line', 'draw_id', 'datetime', 'tanggal', 'waktu', 'slot_dalam_jam', 'result_4d', 'd1', 'd2', 'd3', 'd4', 'present_digits', 'missing_digits_sumber', 'missing_digits_hitung', 'valid_missing_digits', 'sum_digits', 'digital_root', 'even_count', 'odd_count', 'low_0_4_count'])

Python

python
df["datetime"]=pd.to_datetime(df["datetime"]) df[["datetime","waktu","slot_dalam_jam","result_4d"]].head(), df[["datetime","waktu","slot_dalam_jam","result_4d"]].tail(), df.datetime.min(), df.datetime.max(), df.slot_dalam_jam.unique()[:10], df.waktu.unique()[:10] ( datetime waktu slot_dalam_jam result_4d 0 2026-02-10 21:30:00 21:30 1 6112 1 2026-02-10 21:30:00 21:30 2 0236 2 2026-02-10 21:30:00 21:30 3 7341 3 2026-02-11 03:30:00 03:30 1 9876 4 2026-02-11 03:30:00 03:30 2 2992, datetime waktu slot_dalam_jam result_4d 1966 2026-06-01 03:30:00 03:30 2 4489 1967 2026-06-01 03:30:00 03:30 3 4565 1968 2026-06-01 06:30:00 06:30 1 8553 1969 2026-06-01 06:30:00 06:30 2 0197 1970 2026-06-01 06:30:00 06:30 3 1070, Timestamp('2026-02-10 21:30:00'), Timestamp('2026-06-01 06:30:00'), array([1, 2, 3]), array(['21:30', '03:30', '06:30', '11:30', '13:30', '17:30'], dtype=object))

Python

python
import pandas as pd, numpy as np, itertools, os, math, time, json, textwrap, random CSV_PATH='/mnt/data/PRIZE_clean_long.csv' df=pd.read_csv(CSV_PATH, dtype={'result_4d':str}) def load_data_from_df(df): df=df.copy() df['datetime']=pd.to_datetime(df['datetime']) sort_cols=[c for c in ['datetime','slot_dalam_jam','source_line'] if c in df.columns] df=df.sort_values(sort_cols).reset_index(drop=True) df['result_4d']=df['result_4d'].astype(str).str.zfill(4).str[-4:] for pos in range(4): df[f'p{pos+1}']=df['result_4d'].str[pos].astype(int) return df df=load_data_from_df(df) len(df) 1971

Python

python
# Implement baseline functions with params passed, test speed on limited configs def digit_distribution(values, alpha=1.0): counts=np.ones(10,dtype=float)*alpha vals=pd.Series(values).dropna().astype(int) if len(vals): vc=vals.value_counts() for d,c in vc.items(): counts[int(d)]+=float(c) return counts/counts.sum() def make_training_pairs(observed, mode): pair=observed.copy() if mode=='global': for pos in range(1,5): pair[f'y{pos}']=observed[f'p{pos}'].shift(-1) return pair.dropna(subset=['y1','y2','y3','y4']) if mode=='same_slot': for pos in range(1,5): pair[f'y{pos}']=pair.groupby('slot_dalam_jam')[f'p{pos}'].shift(-1) return pair.dropna(subset=['y1','y2','y3','y4']) raise ValueError def probabilities_for_row(df,row_idx,mode, recent_window=144, alpha=1.0, weights=None): if weights is None: weights={'recent':.35,'transition':.35,'conditional':.20,'global':.10} observed=df.iloc[:row_idx+1].copy() current=df.iloc[row_idx] pairs=make_training_pairs(observed, mode) probs=[] for pos in range(1,5): if mode=='same_slot': recent_source=observed[observed['slot_dalam_jam']==current['slot_dalam_jam']] else: recent_source=observed p_recent=digit_distribution(recent_source[f'p{pos}'].tail(recent_window), alpha) p_global=digit_distribution(observed[f'p{pos}'], alpha) cur=int(current[f'p{pos}']) trans=pairs[pairs[f'p{pos}']==cur] if mode=='same_slot': trans=trans[trans['slot_dalam_jam']==current['slot_dalam_jam']] p_transition=digit_distribution(trans[f'y{pos}'], alpha*0.7) cond=pairs[(pairs['waktu']==current['waktu']) & (pairs['slot_dalam_jam']==current['slot_dalam_jam'])] p_cond=digit_distribution(cond[f'y{pos}'], alpha*0.7) combined=sum(weights[k]*v for k,v in [('recent',p_recent),('transition',p_transition),('conditional',p_cond),('global',p_global)]) combined=combined/combined.sum() probs.append(combined) return probs def top_combinations_list(probs,k=5): score_grid=probs[0][:,None,None,None]*probs[1][None,:,None,None]*probs[2][None,None,:,None]*probs[3][None,None,None,:] flat=score_grid.ravel() # for top k use argpartition idx=np.argpartition(flat, -k)[-k:] idx=idx[np.argsort(flat[idx])[::-1]] return [f"{i//1000}{(i//100)%10}{(i//10)%10}{i%10}" for i in idx], flat[idx] def best_bbfs_digits(probs,size=5): best=None best_score=-1 for comb in itertools.combinations(range(10), size): s=1.0 for p in probs: s*=float(p[list(comb)].sum()) if s>best_score: best_score=s; best=comb return ''.join(map(str,best)), best_score def true_next_result(df,row_idx,mode): if mode=='global': if row_idx+1>=len(df): return None return df.loc[row_idx+1,'result_4d'] slot=df.loc[row_idx,'slot_dalam_jam'] future=df[(df.index>row_idx) & (df['slot_dalam_jam']==slot)] if future.empty: return None return future.iloc[0]['result_4d'] def evaluate_config(df, mode, recent_window, alpha, weights, start=None, end=None, top_list=(1,5,10,20), max_top=20): if start is None: start=max(60,int(len(df)*0.80)) if end is None: end=len(df) hits={k:0 for k in top_list} pos_hits=np.zeros(4) bbfs_hits=0 n=0 for row_idx in range(start,end): truth=true_next_result(df,row_idx,mode) if truth is None: continue probs=probabilities_for_row(df,row_idx,mode,recent_window,alpha,weights) preds,_=top_combinations_list(probs,k=max(top_list)) for k in top_list: hits[k]+= int(truth in preds[:k]) top1=preds[0] for pos in range(4): pos_hits[pos]+= int(top1[pos]==truth[pos]) bbfs,_=best_bbfs_digits(probs,5) bbfs_hits+= all(ch in bbfs for ch in truth) n+=1 if n==0: return None return {'mode':mode,'n_test':n,'recent_window':recent_window,'alpha':alpha,'weights':weights, **{f'top_{k}':hits[k]/n for k in top_list}, 'digit_acc_avg':float((pos_hits/n).mean()), 'digit_acc_pos':(pos_hits/n).round(4).tolist(), 'bbfs5_cover':bbfs_hits/n} # small grid WEIGHT_SETS=[ {'recent':0.35,'transition':0.35,'conditional':0.20,'global':0.10}, {'recent':0.45,'transition':0.25,'conditional':0.20,'global':0.10}, {'recent':0.25,'transition':0.45,'conditional':0.20,'global':0.10}, {'recent':0.25,'transition':0.25,'conditional':0.40,'global':0.10}, {'recent':0.20,'transition':0.40,'conditional':0.30,'global':0.10}, {'recent':0.50,'transition':0.30,'conditional':0.10,'global':0.10}, {'recent':0.20,'transition':0.20,'conditional':0.20,'global':0.40}, ] start=time.time() results=[] for mode in ['global','same_slot']: for rw in [72,144,288]: for a in [0.5,1.0,2.0]: for w in WEIGHT_SETS: res=evaluate_config(df,mode,rw,a,w,top_list=(1,5,10,20), max_top=20) results.append(res) len(results), time.time()-start

Python

python
import pandas as pd, numpy as np, itertools, time, os, math, json df=pd.read_csv('/mnt/data/PRIZE_clean_long.csv', dtype={'result_4d':str}) df['datetime']=pd.to_datetime(df['datetime']) sort_cols=[c for c in ['datetime','slot_dalam_jam','source_line'] if c in df.columns] df=df.sort_values(sort_cols).reset_index(drop=True) df['result_4d']=df['result_4d'].astype(str).str.zfill(4).str[-4:] for pos in range(4): df[f'p{pos+1}']=df['result_4d'].str[pos].astype(int) def prepare_pairs(df, mode): pair = df[['waktu','slot_dalam_jam','p1','p2','p3','p4']].copy() if mode == 'global': pair['target_idx'] = pair.index + 1 for pos in range(1,5): pair[f'y{pos}'] = df[f'p{pos}'].shift(-1) pair = pair[pair['target_idx'] < len(df)] elif mode == 'same_slot': pair['target_idx'] = pair.groupby('slot_dalam_jam').apply(lambda g: pd.Series(g.index, index=g.index).shift(-1)).reset_index(level=0, drop=True) for pos in range(1,5): pair[f'y{pos}'] = pair.groupby('slot_dalam_jam')[f'p{pos}'].shift(-1) pair = pair.dropna(subset=['target_idx','y1','y2','y3','y4']) pair['target_idx'] = pair['target_idx'].astype(int) else: raise ValueError for pos in range(1,5): pair[f'y{pos}']=pair[f'y{pos}'].astype(int) return pair.reset_index().rename(columns={'index':'source_idx'}) pairs_by_mode={m:prepare_pairs(df,m) for m in ['global','same_slot']} pairs_by_mode['same_slot'].head(), pairs_by_mode['same_slot'].tail() ( source_idx waktu slot_dalam_jam p1 p2 ... target_idx y1 y2 y3 y4 0 0 21:30 1 6 1 ... 3 9 8 7 6 1 1 21:30 2 0 2 ... 4 2 9 9 2 2 2 21:30 3 7 3 ... 5 1 6 8 5 3 3 03:30 1 9 8 ... 6 2 2 7 9 4 4 03:30 2 2 9 ... 7 9 4 4 3 [5 rows x 12 columns], source_idx waktu slot_dalam_jam p1 p2 ... target_idx y1 y2 y3 y4 1963 1963 21:30 2 1 2 ... 1966 4 4 8 9 1964 1964 21:30 3 2 7 ... 1967 4 5 6 5 1965 1965 03:30 1 4 3 ... 1968 8 5 5 3 1966 1966 03:30 2 4 4 ... 1969 0 1 9 7 1967 1967 03:30 3 4 5 ... 1970 1 0 7 0 [5 rows x 12 columns])

Python

python
def dist_np(vals, alpha=1.0): vals=np.asarray(vals, dtype=int) counts=np.ones(10,dtype=float)*alpha if vals.size: counts += np.bincount(vals, minlength=10).astype(float) return counts/counts.sum() def probabilities_fast(df,row_idx,mode,pairs, recent_window=144, alpha=1.0, weights=None): if weights is None: weights={'recent':.35,'transition':.35,'conditional':.20,'global':.10} observed=df.iloc[:row_idx+1] current=df.iloc[row_idx] pair_avail=pairs[pairs['target_idx'] <= row_idx] probs=[] for pos in range(1,5): if mode=='same_slot': recent_vals=observed.loc[observed['slot_dalam_jam'].eq(current['slot_dalam_jam']), f'p{pos}'].tail(recent_window).values else

Thought

Simplifying pair creation logic

I can simplify the way I prepare pairs by directly using shift(-1) for column values in each group. Now, instead of manually shifting indices inside a loop, I’ll use groupby("slot_dalam_jam") and shift using target_idx = pair.groupby("slot_dalam_jam")["source_idx"].shift(-1). This makes the process more efficient and cleaner.

Thought for 4m 38s

Answer

Di bawah ini versi script yang sudah diubah supaya fokus ke:

  1. Top 5 prediksi paling akurat berdasarkan walk-forward validation.
  2. BBFS 5D terakurat, yaitu 5 digit basis terbaik yang dipilih dari probabilitas gabungan 4 posisi.
  3. Auto-tuning RECENT_WINDOW, ALPHA, dan bobot ensemble.
  4. Output CSV untuk Top 5 dan BBFS 5D.

Catatan penting: “terakurat” di sini berarti terbaik menurut histori file CSV, bukan jaminan hasil berikutnya.

python
# ============================================================ # Script Colab: Prediksi PRIZE 4D Top 5 + BBFS 5D # Algoritma: # - Auto-tuning Weighted Markov + recent frequency + conditional time/slot # - Walk-forward validation tanpa bocor data masa depan # - Output Top 5 paling akurat historis # - Output BBFS 5D, yaitu 5 digit basis terbaik # # Catatan: # Lottery/4D umumnya acak. Script ini hanya ranking statistik historis. # Tidak ada jaminan hasil berikutnya. # ============================================================ import os import json import itertools import numpy as np import pandas as pd # ------------------------- # 1) Upload / set file CSV # ------------------------- try: from google.colab import files # type: ignore print("Upload file PRIZE_clean_long.csv...") uploaded = files.upload() CSV_PATH = next(iter(uploaded.keys())) except Exception: candidates = [ "PRIZE_clean_long.csv", "/content/PRIZE_clean_long.csv", "/mnt/data/PRIZE_clean_long.csv" ] CSV_PATH = next((p for p in candidates if os.path.exists(p)), candidates[0]) # ------------------------- # 2) Parameter utama # ------------------------- TOP_K_FINAL = 5 BBFS_SIZE = 5 MIN_HISTORY = 60 # Untuk auto-tuning. Makin banyak pilihan = makin lama. TUNE_RECENT_WINDOWS = [72, 144, 288, 576] TUNE_ALPHAS = [0.5, 1.0, 1.5, 2.0] # Validasi hanya bagian akhir data agar cepat. # Ubah ke angka lebih besar kalau ingin tuning lebih ketat. VALIDATION_TAIL = 420 VALIDATION_STRIDE = 2 WEIGHT_SETS = [ { "name": "balanced", "weights": { "recent": 0.35, "transition": 0.35, "conditional": 0.20, "global": 0.10 } }, { "name": "recent_strong", "weights": { "recent": 0.50, "transition": 0.25, "conditional": 0.15, "global": 0.10 } }, { "name": "transition_strong", "weights": { "recent": 0.25, "transition": 0.50, "conditional": 0.15, "global": 0.10 } }, { "name": "conditional_strong", "weights": { "recent": 0.20, "transition": 0.25, "conditional": 0.45, "global": 0.10 } }, { "name": "recent_transition", "weights": { "recent": 0.45, "transition": 0.40, "conditional": 0.05, "global": 0.10 } }, { "name": "global_safe", "weights": { "recent": 0.25, "transition": 0.25, "conditional": 0.15, "global": 0.35 } }, ] # ------------------------- # 3) Load dan bersihkan data # ------------------------- def load_data(path: str) -> pd.DataFrame: df = pd.read_csv(path, dtype={"result_4d": str}) required = {"datetime", "waktu", "slot_dalam_jam", "result_4d"} missing = required - set(df.columns) if missing: raise ValueError(f"Kolom wajib tidak ditemukan: {sorted(missing)}") df["datetime"] = pd.to_datetime(df["datetime"]) sort_cols = [c for c in ["datetime", "slot_dalam_jam", "source_line"] if c in df.columns] df = df.sort_values(sort_cols).reset_index(drop=True) df["result_4d"] = df["result_4d"].astype(str).str.zfill(4).str[-4:] for pos in range(4): df[f"p{pos+1}"] = df["result_4d"].str[pos].astype(int) return df def normalize_weights(weights: dict) -> dict: total = float(sum(weights.values())) return {k: float(v) / total for k, v in weights.items()} def digit_distribution(values, alpha: float) -> np.ndarray: counts = np.ones(10, dtype=float) * alpha vals = pd.Series(values).dropna().astype(int).to_numpy() if len(vals): counts += np.bincount(vals, minlength=10) return counts / counts.sum() # ------------------------- # 4) Pair historis tanpa leakage # ------------------------- def prepare_pairs(df: pd.DataFrame, mode: str) -> pd.DataFrame: pair = df[["waktu", "slot_dalam_jam", "p1", "p2", "p3", "p4"]].copy() pair["source_idx"] = pair.index if mode == "global": pair["target_idx"] = pair["source_idx"] + 1 for pos in range(1, 5): pair[f"y{pos}"] = df[f"p{pos}"].shift(-1) pair = pair[pair["target_idx"] < len(df)].copy() elif mode == "same_slot": pair["target_idx"] = pair.groupby("slot_dalam_jam")["source_idx"].shift(-1) for pos in range(1, 5): pair[f"y{pos}"] = pair.groupby("slot_dalam_jam")[f"p{pos}"].shift(-1) pair = pair.dropna(subset=["target_idx", "y1", "y2", "y3", "y4"]).copy() else: raise ValueError("mode harus 'global' atau 'same_slot'") pair["target_idx"] = pair["target_idx"].astype(int) for pos in range(1, 5): pair[f"y{pos}"] = pair[f"y{pos}"].astype(int) return pair.reset_index(drop=True) def build_next_index(df: pd.DataFrame, mode: str) -> np.ndarray: next_idx = np.full(len(df), -1, dtype=int) if mode == "global": next_idx[:-1] = np.arange(1, len(df)) return next_idx if mode == "same_slot": for _, group in df.groupby("slot_dalam_jam", sort=False): idx = group.index.to_numpy() if len(idx) > 1: next_idx[idx[:-1]] = idx[1:] return next_idx raise ValueError("mode harus 'global' atau 'same_slot'") # ------------------------- # 5) Probabilitas per posisi # ------------------------- def probabilities_for_row( df: pd.DataFrame, row_idx: int, mode: str, pair_cache: dict, recent_window: int, alpha: float, weights: dict ) -> list[np.ndarray]: weights = normalize_weights(weights) observed = df.iloc[:row_idx + 1] current = df.iloc[row_idx] pairs = pair_cache[mode] # Pasangan yang targetnya sudah diketahui sampai row_idx. # Ini penting agar evaluasi tidak bocor masa depan. pair_avail = pairs[pairs["target_idx"] <= row_idx] cond_avail = pair_avail[ (pair_avail["waktu"] == current["waktu"]) & (pair_avail["slot_dalam_jam"] == current["slot_dalam_jam"]) ] if mode == "same_slot": recent_observed = observed[observed["slot_dalam_jam"] == current["slot_dalam_jam"]] else: recent_observed = observed probs = [] for pos in range(1, 5): # 1. Frekuensi digit terbaru p_recent = digit_distribution( recent_observed[f"p{pos}"].tail(recent_window), alpha=alpha ) # 2. Distribusi umum p_global = digit_distribution( observed[f"p{pos}"], alpha=alpha ) # 3. Markov transition digit sekarang -> digit berikutnya cur_digit = int(current[f"p{pos}"]) trans_source = pair_avail[pair_avail[f"p{pos}"] == cur_digit] if mode == "same_slot": trans_source = trans_source[ trans_source["slot_dalam_jam"] == current["slot_dalam_jam"] ] p_transition = digit_distribution( trans_source[f"y{pos}"], alpha=alpha * 0.7 ) # 4. Kondisional waktu + slot p_conditional = digit_distribution( cond_avail[f"y{pos}"], alpha=alpha * 0.7 ) combined = ( weights["recent"] * p_recent + weights["transition"] * p_transition + weights["conditional"] * p_conditional + weights["global"] * p_global ) combined = combined / combined.sum() probs.append(combined) return probs # ------------------------- # 6) Top kombinasi 4D # ------------------------- def top_combinations(probs: list[np.ndarray], k: int = 5) -> pd.DataFrame: k = min(k, 10000) score_grid = ( probs[0][:, None, None, None] * probs[1][None, :, None, None] * probs[2][None, None, :, None] * probs[3][None, None, None, :] ) flat = score_grid.reshape(-1) idx = np.argpartition(flat, -k)[-k:] idx = idx[np.argsort(flat[idx])[::-1]] rows = [] for rank, i in enumerate(idx, start=1): num = f"{i:04d}" rows.append({ "rank": rank, "prediksi_4d": num, "score": float(flat[i]) }) return pd.DataFrame(rows) # ------------------------- # 7) BBFS 5D # ------------------------- def best_bbfs_digits(probs: list[np.ndarray], size: int = 5): """ BBFS 5D di sini berarti 5 digit basis terbaik. Skor = peluang semua posisi 4D masuk ke dalam 5 digit tersebut. """ best_digits = None best_score = -1.0 for comb in itertools.combinations(range(10), size): idx = list(comb) score = 1.0 for p in probs: score *= float(p[idx].sum()) if score > best_score: best_score = score best_digits = comb return "".join(map(str, best_digits)), best_score def top_combinations_from_bbfs( probs: list[np.ndarray], bbfs_digits: str, k: int = 5 ) -> pd.DataFrame: digits = [int(x) for x in bbfs_digits] rows = [] for d1, d2, d3, d4 in itertools.product(digits, repeat=4): score = ( probs[0][d1] * probs[1][d2] * probs[2][d3] * probs[3][d4] ) rows.append({ "prediksi_4d": f"{d1}{d2}{d3}{d4}", "score": float(score) }) out = pd.DataFrame(rows) out = out.sort_values("score", ascending=False).head(k).reset_index(drop=True) out.insert(0, "rank", np.arange(1, len(out) + 1)) return out # ------------------------- # 8) Evaluasi walk-forward # ------------------------- def walk_forward_evaluation( df: pd.DataFrame, mode: str, pair_cache: dict, next_index_cache: dict, recent_window: int, alpha: float, weights: dict, top_list=(1, 5, 10, 20), validation_tail: int = VALIDATION_TAIL, validation_stride: int = VALIDATION_STRIDE ) -> dict: start = max(MIN_HISTORY, len(df) - validation_tail) hits = {k: 0 for k in top_list} pos_hits = np.zeros(4, dtype=float) bbfs5_cover = 0 n = 0 next_idx = next_index_cache[mode] for row_idx in range(start, len(df), validation_stride): target_idx = int(next_idx[row_idx]) if target_idx < 0 or target_idx >= len(df): continue truth = df.loc[target_idx, "result_4d"] probs = probabilities_for_row( df=df, row_idx=row_idx, mode=mode, pair_cache=pair_cache, recent_window=recent_window, alpha=alpha, weights=weights ) preds = top_combinations(probs, k=max(top_list))["prediksi_4d"].tolist() for k in top_list: hits[k] += int(truth in preds[:k]) top1 = preds[0] for pos in range(4): pos_hits[pos] += int(top1[pos] == truth[pos]) bbfs_digits, _ = best_bbfs_digits(probs, size=BBFS_SIZE) bbfs5_cover += int(all(ch in bbfs_digits for ch in truth)) n += 1 if n == 0: return { "mode": mode, "n_test": 0, "error": "Tidak ada baris evaluasi." } result = { "mode": mode, "n_test": n, "recent_window": recent_window, "alpha": alpha, "weight_name": None, "weights": json.dumps(normalize_weights(weights)), "bbfs_size": BBFS_SIZE, "bbfs5_cover": bbfs5_cover / n, "digit_acc_avg": float((pos_hits / n).mean()), "digit_acc_p1": float(pos_hits[0] / n), "digit_acc_p2": float(pos_hits[1] / n), "digit_acc_p3": float(pos_hits[2] / n), "digit_acc_p4": float(pos_hits[3] / n), } for k in top_list: result[f"top_{k}"] = hits[k] / n return result # ------------------------- # 9) Auto-tuning parameter # ------------------------- def auto_tune_mode( df: pd.DataFrame, mode: str, pair_cache: dict, next_index_cache: dict ): records = [] configs = [] print("\n" + "=" * 70) print(f"Auto-tuning mode: {mode}") config_id = 0 for recent_window in TUNE_RECENT_WINDOWS: for alpha in TUNE_ALPHAS: for wset in WEIGHT_SETS: weights = wset["weights"] res = walk_forward_evaluation( df=df, mode=mode, pair_cache=pair_cache, next_index_cache=next_index_cache, recent_window=recent_window, alpha=alpha, weights=weights, top_list=(1, 5, 10, 20), validation_tail=VALIDATION_TAIL, validation_stride=VALIDATION_STRIDE ) if "error" in res: continue res["config_id"] = config_id res["weight_name"] = wset["name"] # Skor pemilihan: # Prioritas utama Top 5 exact hit, # lalu Top 10, BBFS cover, dan akurasi digit. res["selection_score"] = ( res["top_5"] * 1000 + res["top_10"] * 100 + res["top_20"] * 10 + res["bbfs5_cover"] * 2 + res["digit_acc_avg"] ) records.append(res) configs.append({ "config_id": config_id, "mode": mode, "recent_window": recent_window, "alpha": alpha, "weight_name": wset["name"], "weights": weights }) config_id += 1 summary = pd.DataFrame(records) if summary.empty: raise RuntimeError(f"Tuning gagal untuk mode {mode}: tidak ada hasil evaluasi.") summary = summary.sort_values( ["selection_score", "top_5", "top_10", "bbfs5_cover", "digit_acc_avg"], ascending=False ).reset_index(drop=True) best_config_id = int(summary.iloc[0]["config_id"]) best_config = next(c for c in configs if c["config_id"] == best_config_id) print("\nBest config:") print(summary.head(5)[[ "mode", "n_test", "recent_window", "alpha", "weight_name", "top_1", "top_5", "top_10", "top_20", "bbfs5_cover", "digit_acc_avg", "selection_score" ]]) return summary, best_config # ------------------------- # 10) Prediksi final # ------------------------- def make_prediction_package( df: pd.DataFrame, row_idx: int, mode: str, config: dict, pair_cache: dict, label: str ): probs = probabilities_for_row( df=df, row_idx=row_idx, mode=mode, pair_cache=pair_cache, recent_window=config["recent_window"], alpha=config["alpha"], weights=config["weights"] ) top5 = top_combinations(probs, k=TOP_K_FINAL) top5.insert(0, "label", label) top5.insert(1, "mode", mode) top5.insert(2, "recent_window", config["recent_window"]) top5.insert(3, "alpha", config["alpha"]) top5.insert(4, "weight_name", config["weight_name"]) bbfs_digits, bbfs_score = best_bbfs_digits(probs, size=BBFS_SIZE) bbfs_top5 = top_combinations_from_bbfs( probs=probs, bbfs_digits=bbfs_digits, k=TOP_K_FINAL ) bbfs_top5.insert(0, "label", label) bbfs_top5.insert(1, "mode", mode) bbfs_top5.insert(2, "bbfs_5d", bbfs_digits) bbfs_top5.insert(3, "bbfs_score", float(bbfs_score)) bbfs_top5.insert(4, "recent_window", config["recent_window"]) bbfs_top5.insert(5, "alpha", config["alpha"]) bbfs_top5.insert(6, "weight_name", config["weight_name"]) digit_rows = [] for pos, p in enumerate(probs, start=1): ranked = sorted( [(d, float(p[d])) for d in range(10)], key=lambda x: x[1], reverse=True ) for rank, (digit, prob) in enumerate(ranked, start=1): digit_rows.append({ "label": label, "mode": mode, "posisi": pos, "rank": rank, "digit": digit, "probability": prob }) digit_probs = pd.DataFrame(digit_rows) return top5, bbfs_top5, digit_probs # ------------------------- # 11) Main # ------------------------- df = load_data(CSV_PATH) print(f"Data terbaca: {len(df):,} baris") print("Rentang waktu:", df["datetime"].min(), "sampai", df["datetime"].max()) print("\nBaris terakhir:") print(df.tail(1)[["datetime", "waktu", "slot_dalam_jam", "result_4d"]]) pair_cache = { "global": prepare_pairs(df, "global"), "same_slot": prepare_pairs(df, "same_slot") } next_index_cache = { "global": build_next_index(df, "global"), "same_slot": build_next_index(df, "same_slot") } # Auto-tuning masing-masing mode summary_global, best_global = auto_tune_mode( df=df, mode="global", pair_cache=pair_cache, next_index_cache=next_index_cache ) summary_same_slot, best_same_slot = auto_tune_mode( df=df, mode="same_slot", pair_cache=pair_cache, next_index_cache=next_index_cache ) summary_all = pd.concat([summary_global, summary_same_slot], ignore_index=True) summary_all = summary_all.sort_values( ["selection_score", "top_5", "top_10", "bbfs5_cover", "digit_acc_avg"], ascending=False ).reset_index(drop=True) summary_all.to_csv("tuning_summary_top5_bbfs5d.csv", index=False) print("\n" + "=" * 70) print("Ringkasan tuning terbaik semua mode:") print(summary_all.head(10)[[ "mode", "n_test", "recent_window", "alpha", "weight_name", "top_1", "top_5", "top_10", "top_20", "bbfs5_cover", "digit_acc_avg", "selection_score" ]]) # Evaluasi ulang config terbaik dengan stride 1 agar laporan lebih stabil print("\n" + "=" * 70) print("Evaluasi final config terbaik, stride 1:") final_eval_global = walk_forward_evaluation( df=df, mode="global", pair_cache=pair_cache, next_index_cache=next_index_cache, recent_window=best_global["recent_window"], alpha=best_global["alpha"], weights=best_global["weights"], top_list=(1, 5, 10, 20), validation_tail=VALIDATION_TAIL, validation_stride=1 ) final_eval_same_slot = walk_forward_evaluation( df=df, mode="same_slot", pair_cache=pair_cache, next_index_cache=next_index_cache, recent_window=best_same_slot["recent_window"], alpha=best_same_slot["alpha"], weights=best_same_slot["weights"], top_list=(1, 5, 10, 20), validation_tail=VALIDATION_TAIL, validation_stride=1 ) print("\nGLOBAL:") print(final_eval_global) print("\nSAME_SLOT:") print(final_eval_same_slot) pd.DataFrame([final_eval_global, final_eval_same_slot]).to_csv( "final_evaluation_top5_bbfs5d.csv", index=False ) # ------------------------- # 12) Prediksi GLOBAL setelah baris terakhir # ------------------------- last_idx = len(df) - 1 print("\n" + "=" * 70) print("Prediksi TOP 5 GLOBAL setelah baris terakhir") print("Best global config:", best_global) top5_global, bbfs_global, digit_probs_global = make_prediction_package( df=df, row_idx=last_idx, mode="global", config=best_global, pair_cache=pair_cache, label="next_global_after_last_row" ) print("\nTOP 5 GLOBAL:") print(top5_global) print("\nBBFS 5D GLOBAL:") print(bbfs_global) top5_global.to_csv("prediksi_top5_global.csv", index=False) bbfs_global.to_csv("bbfs5d_global.csv", index=False) digit_probs_global.to_csv("digit_probabilities_global.csv", index=False) # ------------------------- # 13) Prediksi SAME_SLOT untuk slot pada datetime terakhir # ------------------------- print("\n" + "=" * 70) print("Prediksi TOP 5 SAME_SLOT untuk slot-slot pada datetime terakhir") print("Best same_slot config:", best_same_slot) last_datetime = df["datetime"].max() last_rows = df[df["datetime"] == last_datetime].copy() all_top5_slot = [] all_bbfs_slot = [] all_digit_probs_slot = [] for idx, row in last_rows.iterrows(): label = ( f"next_same_slot_slot_{row['slot_dalam_jam']}" f"_after_{row['datetime']}" ) print("\n" + "-" * 70) print( "Slot:", row["slot_dalam_jam"], "| waktu:", row["waktu"], "| result terakhir:", row["result_4d"] ) top5_slot, bbfs_slot, digit_probs_slot = make_prediction_package( df=df, row_idx=int(idx), mode="same_slot", config=best_same_slot, pair_cache=pair_cache, label=label ) top5_slot.insert(0, "slot_dalam_jam", row["slot_dalam_jam"]) top5_slot.insert(1, "last_result_4d", row["result_4d"]) bbfs_slot.insert(0, "slot_dalam_jam", row["slot_dalam_jam"]) bbfs_slot.insert(1, "last_result_4d", row["result_4d"]) digit_probs_slot.insert(0, "slot_dalam_jam", row["slot_dalam_jam"]) digit_probs_slot.insert(1, "last_result_4d", row["result_4d"]) print("\nTOP 5 SAME_SLOT:") print(top5_slot) print("\nBBFS 5D SAME_SLOT:") print(bbfs_slot) all_top5_slot.append(top5_slot) all_bbfs_slot.append(bbfs_slot) all_digit_probs_slot.append(digit_probs_slot) pred_top5_same_slot = pd.concat(all_top5_slot, ignore_index=True) pred_bbfs_same_slot = pd.concat(all_bbfs_slot, ignore_index=True) pred_digit_probs_same_slot = pd.concat(all_digit_probs_slot, ignore_index=True) pred_top5_same_slot.to_csv("prediksi_top5_same_slot.csv", index=False) pred_bbfs_same_slot.to_csv("bbfs5d_same_slot.csv", index=False) pred_digit_probs_same_slot.to_csv("digit_probabilities_same_slot.csv", index=False) # ------------------------- # 14) File output # ------------------------- print("\n" + "=" * 70) print("File output dibuat:") print("- tuning_summary_top5_bbfs5d.csv") print("- final_evaluation_top5_bbfs5d.csv") print("- prediksi_top5_global.csv") print("- bbfs5d_global.csv") print("- digit_probabilities_global.csv") print("- prediksi_top5_same_slot.csv") print("- bbfs5d_same_slot.csv") print("- digit_probabilities_same_slot.csv") print("\nCatatan:") print("1. TOP 5 = 5 angka 4D dengan skor probabilitas tertinggi.") print("2. BBFS 5D = 5 digit basis terbaik, lalu script tampilkan Top 5 kombinasi 4D dari digit tersebut.") print("3. Gunakan hasil sebagai ranking statistik, bukan kepastian.")

Bagian paling penting yang berubah: script sekarang memilih parameter terbaik otomatis dari histori, lalu baru membuat Top 5 final dan BBFS 5D final.

Share this Q&A