SynxML Python
SynxML Python provides a Python SDK for data manipulation and machine learning on SynxML. It offers two main interfaces: one for users familiar with Ray and another for general users (DataFrame).
Prerequisites
Before using SynxML Python, ensure the following resources are set up:
Create organization, account, and user/role via the DBaaS Admin Console.
Create a warehouse.
Create an ML cluster.
For details, see Use DBaaS Admin Console to create resources.
Prepare database
Ensure you have a database created and the synxml extension enabled.
Create a database (for example,
testdb) and connect to it:CREATE DATABASE IF NOT EXISTS testdb; \c testdb;
Create and configure the extension (for example, using the
synxml_authrole):CREATE EXTENSION IF NOT EXISTS synxml CASCADE; SELECT synxml.configure_auth_role('synxml_auth');
Enter the SynxML Python environment
To access the SynxML Python environment, you need to connect to the Kubernetes pod running the ML cluster. Follow these steps:
Access the Kubernetes cluster: Log in to the Kubernetes environment where SynxDB Cloud is deployed.
Identify the ML cluster pod: Run the following command to list all pods and find the one associated with your ML cluster.
kubectl get po -A
Locate the pod name (for example,
mlc1--worker-d7mqd) and its namespace (for example,org1-usr2-70b2facb).Enter the pod: Use the
kubectl execcommand to open a bash shell in the ML cluster pod.kubectl exec -it <pod_name> -n <namespace> -- bash
For example:
kubectl exec -it mlc1--worker-d7mqd -n org1-usr2-70b2facb -- bash
Start Python: after successful login, you will see the Ray container prompt. Start the Python interpreter.
python3
You should see an output similar to:
Defaulted container "ray" out of: ray, wait-gcs-ready (init) (base) ray@mlc1--worker-d7mqd:~$ python3
You are now in the SynxML Python environment.
Configure database
import psycopg2
# Database connection (update with your credentials)
# Replace <ip>, <port>, and <database_name> with your actual values
DB_URI = "postgresql://gpadmin@<ip>:<port>/<database_name>"
with psycopg2.connect(DB_URI) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS synxml CASCADE")
cur.execute("SELECT synxml.configure_extension()")
import os
os.environ["https_proxy"] = "http://<ip>:<port>"
os.environ["http_proxy"] = "http://<ip>:<port>"
Load and preprocess data
SynxML provides powerful data loading and preprocessing capabilities.
Load data from datasets
import ray
import pandas as pd
from datasets import load_dataset
from synxml.data import read_dataset, write_dataset
print("Loading sample datasets...")
# Load Iris dataset
iris_data = load_dataset('scikit-learn/iris')
iris_train = iris_data['train']
iris_train_ds = ray.data.from_huggingface(iris_train)
# Load California Housing dataset
housing_data = load_dataset('gvlassis/california_housing')
housing_train = housing_data['train']
housing_test = housing_data['test']
housing_train_ds = ray.data.from_huggingface(housing_train)
housing_test_ds = ray.data.from_huggingface(housing_test)
# Load ChnSentiCorp dataset
sentiment_data = load_dataset('lansinuote/ChnSentiCorp')
sentiment_train = sentiment_data['train']
sentiment_test = sentiment_data['test']
sentiment_train_ds = ray.data.from_huggingface(sentiment_train)
sentiment_test_ds = ray.data.from_huggingface(sentiment_test)
# Print dataset shapes
print(f"Iris dataset shape: {iris_train_ds.count()} rows")
print(f"Housing dataset shape: {housing_train_ds.count()} rows")
print(f"Housing dataset shape: {housing_test_ds.count()} rows")
print(f"ChnSentiCorp dataset shape: {sentiment_train_ds.count()} rows")
print(f"ChnSentiCorp dataset shape: {sentiment_test_ds.count()} rows")
# Save to database
write_dataset(iris_train_ds, "iris_train", DB_URI)
write_dataset(housing_train_ds, "california_housing_train", DB_URI)
write_dataset(housing_test_ds, "california_housing_test", DB_URI)
write_dataset(sentiment_train_ds, "chnsenti_train", DB_URI)
write_dataset(sentiment_test_ds, "chnsenti_test", DB_URI)
print("Datasets saved to database!")
Load data from torchvision
import torch
from torchvision import datasets
from torchvision.transforms import ToTensor
def prepare_fashion_mnist(train=True):
"""Prepare FashionMNIST dataset"""
data = datasets.FashionMNIST(
root="~/data",
train=train,
download=True,
transform=ToTensor()
)
# Convert to pandas DataFrame
df = pd.DataFrame(list(data), columns=['X', 'y'])
# Convert tensors to numpy arrays
df['X'] = df['X'].apply(lambda r: r.to(torch.float64).numpy())
return ray.data.from_pandas(df)
fashion_train = prepare_fashion_mnist(train=True)
fashion_test = prepare_fashion_mnist(train=False)
print(f"FashionMNIST train: {fashion_train.count()} samples")
print(f"FashionMNIST test: {fashion_test.count()} samples")
write_dataset(fashion_train, 'fashionmnist_train', DB_URI)
write_dataset(fashion_test, 'fashionmnist_test', DB_URI)
print("FashionMNIST datasets saved!")
import torch
from torchvision import datasets
from torchvision.transforms import ToTensor
import pandas as pd
import numpy as np
import io
from PIL import Image
import ray
import pandas as pd
from datasets import load_dataset
from synxml.data import read_dataset, write_dataset
def prepare_fashion_mnist_bytes(train=True):
data = datasets.FashionMNIST(
root="~/data",
train=train,
download=True,
transform=ToTensor()
)
# Convert to pandas DataFrame
df = pd.DataFrame(list(data), columns=['X', 'y'])
# Transform tensor to bytes
def tensor_to_bytes(tensor):
img_array = (tensor.squeeze().numpy() * 255).astype(np.uint8)
img = Image.fromarray(img_array)
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
return img_bytes.getvalue()
df['X'] = df['X'].apply(tensor_to_bytes)
return ray.data.from_pandas(df)
fashion_train = prepare_fashion_mnist_bytes(train=True)
fashion_test = prepare_fashion_mnist_bytes(train=False)
print(f"FashionMNIST train: {fashion_train.count()} samples")
print(f"FashionMNIST test: {fashion_test.count()} samples")
# Write train and test to db
write_dataset(fashion_train, 'fashionmnist_train_bytes', DB_URI)
write_dataset(fashion_test, 'fashionmnist_test_bytes', DB_URI)
# Sample 1% from fashion_train
fashion_sample = fashion_train.random_sample(fraction=0.01, seed=42)
print(f"FashionMNIST sample: {fashion_sample.count()} samples")
# Write sample to db
write_dataset(fashion_sample, 'fashionmnist_sample_bytes', DB_URI)
Transform and preprocess data
iris_dataset = read_dataset('iris_train', DB_URI)
print("Original Iris dataset:")
iris_dataset.show(3)
# Define transformation function
def transform_iris(batch):
"""Transform Iris dataset"""
# Map species to numeric values
species_map = {
'Iris-setosa': 0,
'Iris-versicolor': 1,
'Iris-virginica': 2
}
batch['Species_encoded'] = batch['Species'].map(species_map)
# Create new features
batch['Sepal_ratio'] = batch['SepalLengthCm'] / batch['SepalWidthCm']
batch['Petal_ratio'] = batch['PetalLengthCm'] / batch['PetalWidthCm']
# Drop original Species column
batch = batch.drop(columns=['Species'])
return batch
# Apply transformation
iris_transformed = iris_dataset.map_batches(transform_iris, batch_format="pandas")
print("\nTransformed Iris dataset:")
iris_transformed.show(3)
# Save transformed data
write_dataset(iris_transformed, "iris_processed", DB_URI)
print("Transformed data saved!")
Use traditional ML models
SynxML provides a comprehensive suite of traditional machine learning models through sklearn integration.
Support vector machine (SVM)
An example of training:
from synxml.models import SVC svc = SVC( C=1.0, kernel='rbf', gamma='scale', probability=True, # Enable probability estimates random_state=42 ) print("Training SVM on Iris dataset...") svc.fit( train_tblname='iris_processed', model_name='iris_svm_classifier', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'sample_weight': None }, db_uri=DB_URI ) print("SVM training completed!") print(f"Model saved as: iris_svm_classifier") print(f"Model Info: {svc.model}")
An example of prediction:
from synxml.models import SVC svc = SVC.from_model_name('iris_svm_classifier', db_uri_models=DB_URI) print("Making predictions with SVC...") svc.batch_predict( input_tblname='iris_processed', output_tblname='iris_svm_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI )
Use logistic regression
An example of training:
from synxml.models import LogisticRegression logreg = LogisticRegression( penalty='l2', C=1.0, solver='lbfgs', multi_class='auto', max_iter=1000, random_state=42 ) print("Training Logistic Regression on Iris dataset...") logreg.fit( train_tblname='iris_processed', model_name='iris_logreg_classifier', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'sample_weight': None }, db_uri=DB_URI ) print("Logistic Regression training completed!") print(f"Model saved as: iris_logreg_classifier") print(f"Model Info: {logreg.model}")
An example of prediction:
from synxml.models import LogisticRegression logreg = LogisticRegression.from_model_name('iris_logreg_classifier', db_uri_models=DB_URI) print("Making predictions with LogisticRegression...") logreg.batch_predict( input_tblname='iris_processed', output_tblname='iris_logreg_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI )
Use AdaBoost classifier
An example of training:
from synxml.models import AdaBoostClassifier adaboost = AdaBoostClassifier( n_estimators=50, learning_rate=1.0, algorithm='SAMME', random_state=42 ) print("Training AdaBoost on Iris dataset...") adaboost.fit( train_tblname='iris_processed', model_name='iris_adaboost_classifier', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'sample_weight': None }, db_uri=DB_URI ) print("AdaBoost training completed!") print(f"Model saved as: iris_adaboost_classifier") print(f"Model Info: {adaboost.model}")
An example of prediction:
from synxml.models import AdaBoostClassifier adaboost = LogisticRegression.from_model_name('iris_adaboost_classifier', db_uri_models=DB_URI) print("Making predictions with LogisticRegression...") adaboost.batch_predict( input_tblname='iris_processed', output_tblname='iris_adaboost_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI )
Use random forest classifier
An example of training:
from synxml.models import RandomForestClassifier rfc = RandomForestClassifier( n_estimators=100, max_depth=10, random_state=42 ) print("Training Random Forest classifier...") rfc.fit( train_tblname='iris_processed', model_name='iris_rf_classifier', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'num_workers': 4, 'use_gpu': False, }, db_uri=DB_URI ) print("Random Forest training completed!") print(f"Model saved as: iris_rf_classifier") print(f"Model Info: {rfc.model}")
An example of prediction:
from synxml.models import RandomForestClassifier rfc = RandomForestClassifier.from_model_name('iris_rf_classifier', db_uri_models=DB_URI) print("Making predictions with Random Forest...") rfc.batch_predict( input_tblname='iris_processed', output_tblname='iris_rf_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_rf_predictions")
Use KMeans clustering
An example of training:
from synxml.models import KMeans kms = KMeans(n_clusters=3, init='k-means++', algorithm='lloyd') #kms = KMeans() kms.fit( train_tblname='iris_processed', modelname='kms_iris', train_config={ #'y': None, 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'num_workers':1, 'use_gpu':False, 'scoring': 'davies_bouldin', }, db_uri=DB_URI )
An example of prediction:
from synxml.models import KMeans rfc = KMeans.from_model_name('lloyd_a488af3bf68d4a578030ccb0cfd792e8', db_uri_models=DB_URI) rfc.batch_predict( input_tblname='iris_processed', output_tblname='iris_kms_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_kms_predictions")
Use DBSCAN clustering
An example of training and prediction:
from synxml.models import DBSCAN
dbs = DBSCAN()
dbs.fit(
train_tblname='iris_processed',
modelname='kms_iris',
train_config={
#'y': None,
'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'],
'num_workers':1,
'use_gpu':False,
'scoring': 'davies_bouldin',
},
db_uri=DB_URI
)
Use MeanShift clustering
An example of training:
from synxml.models import MeanShift msf = MeanShift() msf.fit( train_tblname='iris_processed', modelname='msf_iris', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'num_workers':1, 'use_gpu':False, }, db_uri=DB_URI )
An example of prediction:
from synxml.models import MeanShift msf = MeanShift.from_model_name('MeanShift_80d6ee0647c64c94b98ee81ed066409c', db_uri_models=DB_URI) msf.batch_predict( input_tblname='iris_processed', output_tblname='iris_msf_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_msf_predictions")
Use SpectralClustering clustering
An example of training:
from synxml.models import SpectralClustering opt = SpectralClustering() opt.fit( train_tblname='iris_processed', modelname='opt_iris', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'num_workers':1, 'use_gpu':False, }, db_uri=DB_URI )
An example of prediction:
from synxml.models import SpectralClustering opt = SpectralClustering.from_model_name('SpectralClustering_80d6ee0647c64c94b98ee81ed066409c', db_uri_models=DB_URI) opt.batch_predict( input_tblname='iris_processed', output_tblname='iris_opt_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_opt_predictions")
import ray ray.data.DataContext.get_current().use_ray_tqdm = False DB_URI = "postgresql://<user>@<ip>:<port>/<db_name>"
Use SpectralClustering clustering
An example of training:
from synxml.models import SpectralClustering opt = SpectralClustering() opt.fit( train_tblname='iris_processed', modelname='opt_iris', train_config={ 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'num_workers':1, 'use_gpu':False, }, db_uri=DB_URI )
An example of prediction:
from synxml.models import SpectralClustering spc = SpectralClustering.from_model_name('SpectralClustering_4cf9d2971ca7413f8c015085af563158', db_uri_models=DB_URI) spc.batch_predict( input_tblname='iris_processed', output_tblname='iris_spc_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio', 'Species_encoded'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_spc_predictions")
Use XGBoost classification
An example of training:
from synxml.models import XGBoost # Initialize XGBoost regressor xgb = XGBoost() # Train on California Housing data print("Training XGBoost regressor on California Housing data...") xgb.fit( train_tblname='iris_processed', model_name='iris_xgb_classifier', train_config={ 'objective': 'multi:softmax', 'num_class': 3, 'num_boost_round': 100, 'early_stopping_rounds': 10, 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'num_workers': 1, 'use_gpu': False, }, db_uri=DB_URI ) print("XGBoost training completed!") print(f"Model saved as: iris_xgb_classifier") print(f"Model Info: {xgb.model.attributes()}")
An example of prediction:
from synxml.models import XGBoost xgb = XGBoost.from_model_name('iris_xgb_classifier', db_uri_models=DB_URI) print("Making predictions with Random Forest...") xgb.batch_predict( input_tblname='iris_processed', output_tblname='iris_xgb_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_xgb_predictions")
Use XGBoost regression
An example of training:
from synxml.models import XGBoost xgb = XGBoost() print("Training XGBoost regressor on California Housing data...") xgb.fit( train_tblname='california_housing_train', model_name='california_housing_xgb_regressor', train_config={ 'objective': 'reg:squarederror', 'y': 'MedHouseVal', 'num_boost_round': 100, 'early_stopping_rounds': 10, 'num_workers': 1 }, db_uri=DB_URI, valid_tblname='california_housing_test' ) print("XGBoost training completed!") print(f"Model saved as: california_housing_xgb") print(f"Model Info: {xgb.model.attributes()}")
An example of prediction:
from synxml.models import XGBoost xgb = XGBoost.from_model_name('california_housing_xgb_regressor', db_uri_models=DB_URI) print("Making predictions with Random Forest...") xgb.batch_predict( input_tblname='california_housing_test', output_tblname='california_housing_xgb_predictions', predict_config={ 'y': 'MedHouseVal' }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: california_housing_xgb_predictions")
Use LightGBM classification
An example of training:
from synxml.models import LightGBM lgb = LightGBM() print("Training LightGBM classifier...") lgb.fit( train_tblname='iris_processed', model_name='iris_lgb_classifier', train_config={ 'objective': 'multiclass', 'num_class': 3, 'X': ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'num_iterations': 100, 'learning_rate': 0.1, 'num_workers': 2 }, db_uri=DB_URI ) print("LightGBM training completed!") print(f"Model saved as: iris_lgb_classifier") print(f"Model Info: {lgb.model}")
An example of prediction:
from synxml.models import LightGBM lgb = LightGBM.from_model_name('iris_lgb_classifier', db_uri_models=DB_URI) print("Making predictions with LightGBM...") lgb.batch_predict( input_tblname='iris_processed', output_tblname='iris_lgb_predictions', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Id', 'Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_lgb_predictions")
Use Catboost classification
An example of training:
DB_URI = "postgresql://gpadmin@10.13.10.191:7000/synxml_demo" import ray ray.data.DataContext.get_current().use_ray_tqdm = False
from synxml.models import CatBoost ctb = CatBoost() print("Training Catboost classifier...") ctb.fit( train_tblname='iris_processed', model_name='iris_ctb_classifier', train_config={ 'iterations': 100, 'learning_rate': 0.01, 'loss_function': 'MultiClass', 'depth': 3, 'use_gpu': False, 'X': ['Id', 'SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm'], 'y': 'Species_encoded', 'cat_features': ['Id'] }, db_uri=DB_URI ) print("CatBoostClassifier training completed!") print(f"Model saved as: iris_ctb_classifier") print(f"Model Info: {ctb.model}")
An example of prediction:
DB_URI = "postgresql://gpadmin@10.13.10.191:7000/synxml_demo" import ray ray.data.DataContext.get_current().use_ray_tqdm = False
from synxml.models import CatBoost ctb = CatBoost.from_model_name('iris_ctb_classifier', db_uri_models=DB_URI) print("Making predictions with CatBoostClassifier...") ctb.batch_predict( input_tblname='iris_processed', output_tblname='iris_ctb_predictions11', predict_config={ 'y': 'Species_encoded', 'drop_columns': ['Petal_ratio', 'Sepal_ratio'], 'keep_columns': ['Id'] }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: iris_ctb_predictions")
Use deep learning models
SynxML supports PyTorch-based deep learning models for complex tasks.
Use multi-layer perceptron (MLP)
An example of training:
from synxml.models import MLP mlp = MLP( input_size=784, # 28x28 flattened images hidden_config=[256, 128], # Two hidden layers output_size=10, # 10 fashion categories activation='relu', dropout=0.2 ) print("Training MLP on FashionMNIST...") mlp.fit( train_tblname='fashionmnist_train', model_name='fashion_mlp_classifier', train_config={ 'X': 'X', 'y': 'y', 'num_epochs': 5, 'num_workers': 4, 'use_gpu': False, 'per_device_batch_size': 64, 'learning_rate': 0.001 }, db_uri=DB_URI ) print("MLP training completed!") print(f"Model saved as: fashion_mlp_classifier")
An example of prediction:
from synxml.models import MLP mlp = MLP.from_model_name('fashion_mlp_classifier', db_uri_models=DB_URI) print("Making predictions with MLP...") mlp.batch_predict( input_tblname='fashionmnist_test', output_tblname='fashion_mlp_classifier', predict_config={ 'y': 'y', "compute_label": True }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: fashion_mlp_classifier")
Use convolutional neural network (ResNet)
An example of training:
from synxml.models import ResNet # Initialize model resnet = ResNet( pretrained_model_name="resnet-50", ) # Training config train_config = { 'X': 'X', 'y': 'y', 'num_class': 10, 'num_train_epochs': 1, 'learning_rate': 2e-5, 'per_device_train_batch_size': 8, 'use_gpu': False } # Start training resnet.fit( train_tblname='fashionmnist_sample_bytes', model_name='fashion_resnet_classifier', train_config=train_config, valid_tblname='fashionmnist_sample_bytes', db_uri=DB_URI )
An example of prediction:
from synxml.models import ResNet reset = ResNet.from_model_name('fashion_resnet_classifier', db_uri_models=DB_URI) print("Making predictions with ResNet...") reset.batch_predict( input_tblname='fashionmnist_sample_bytes', output_tblname='fashion_resnet_classifier1', predict_config={ 'y': 'y', }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: fashion_resnet_classifier")
Use bidirectional encoder representations from transformers (BERT)
An example of training:
from synxml.models import Bert # Initialize model bert = Bert( pretrained_model_name="bert-base-chinese", ) # Training config train_config = { 'X': 'review', 'y': 'label', 'num_class': 2, 'num_train_epochs': 1, 'learning_rate': 2e-5, 'per_device_train_batch_size': 32, 'use_gpu': False } bert.fit( train_tblname='waimai_mini_train', model_name='waimai_bert_classifier', train_config=train_config, valid_tblname='waimai_mini_val', db_uri=DB_URI )
An example of prediction:
from synxml.models import Bert reset = Bert.from_model_name('waimai_bert_classifier', db_uri_models=DB_URI) print("Making predictions with Bert...") reset.batch_predict( input_tblname='waimai_mini_val', output_tblname='waimai_mini_val_pred', predict_config={ 'y': 'label', 'X': 'review' }, db_uri=DB_URI ) print("Predictions completed!") print("Results saved to: waimai_mini_val_pred")
Use timeseries modeling: RNNBlockRegressor
An example of training:
from synxml.timeseries.models import RNNBlockRegressor rnn = RNNBlockRegressor( in_chunk_len = 96, out_chunk_len = 24, rnn_type_or_module = "RNN", hidden_size = 128, embedding_size = 64, num_layers_recurrent = 1 ) rnn.fit( train_tblname="power_train", train_config = { "num_epochs": 10, "num_worker": 10, "use_gpu": False, "batch_size": 32, "metrics": ["mse", "mae"] }, variable_config = { # target variables and covariate variables supported "time_col": "timestamp", "target_cols": ["power"], "observed_cov_cols": ["voltage"], "known_cov_cols": ["minute", "hour"] }, valid_tblname="power_val", db_uri=DB_URI )
An example of forecasting:
from synxml.timeseries.models import RNNBlockRegressor rnn = RNNBlockRegressor.from_model_name( "RNNBlockRegressor_18c997e6716b4f56bfebd4f7061a2c46", # replace with your model name db_uri_models=DB_URI ) pred_scaled = rnn.predict(input_tblname="power_val", output_tblname="power_predictions", db_uri=DB_URI) print("Forecasting completed.") print(pred_scaled)
Use timeseries modeling: DLinearRegressor
An example of training:
from synxml.timeseries.models import DLinearRegressor dlinear = DLinearRegressor( in_chunk_len = 96, out_chunk_len = 24, individual = False, kernel_size = 25 ) dlinear.fit( train_tblname="power_train", train_config = { "num_epochs": 10, "num_worker": 10, "use_gpu": False, "batch_size": 32, "metrics": ["mse", "mae"] }, variable_config = { "time_col": "timestamp", "target_cols": ["power"], # only target variables supported }, valid_tblname="power_val", db_uri=DB_URI )
An example of forecasting:
from synxml.timeseries.models import DLinearRegressor dlinear = DLinearRegressor.from_model_name( "DLinearRegressor_d4803486a5a542e293f6d075226037fd", # replace with your model name db_uri_models=DB_URI ) pred_scaled = dlinear.predict(input_tblname="power_val", output_tblname="power_predictions", db_uri=DB_URI) print("Forecasting completed.") print(pred_scaled)
Use AutoML capabilities
includes powerful AutoML features for automated model selection and hyperparameter tuning.
Use cross-validation
from synxml.models import RandomForestClassifier, cross_validate
# Initialize model for cross-validation
rf_cv_model = RandomForestClassifier()
# Configure cross-validation
cv_config = {
'y': 'Species_encoded',
'cv': 5, # 5-fold cross-validation
'scoring': ['accuracy', 'f1_macro', 'precision'],
'n_jobs': -1 # Use all available cores
}
print("Performing 5-fold cross-validation...")
cv_results = cross_validate(
model=rf_cv_model,
train_tblname='iris_processed',
db_uri=DB_URI,
train_config=cv_config
)
print("Cross-validation results:\n", cv_results)
Use hyperparameter tuning
from synxml.models import XGBoost, param_tune
# Initialize model for tuning
xgb_tune_model = XGBoost()
# Define parameter grid
tune_config = {
'y': 'Species_encoded',
'cv': 3,
'scoring': 'accuracy',
'n_jobs': -1,
'param_grid': {
'n_estimators': [100, 200],
'max_depth': [3, 5, 7],
'learning_rate': [0.01, 0.1, 0.3],
'subsample': [0.8, 1.0]
}
}
print("Starting hyperparameter tuning...")
best_params = param_tune(
model=xgb_tune_model,
train_tblname='iris_processed',
db_uri=DB_URI,
train_config=tune_config
)
print("\nBest parameters found:")
for param, value in best_params.items():
print(f"{param}: {value}")
Use Flow - Data processing pipelines
Flow provides a powerful pipeline system for building complex data processing workflows.
DB_URI = "postgresql://<user>@<ip>:<port>/<db_name>"
Use text processing workflow
from synxml.flow.pipeline.workflow import *
from synxml.flow.pipeline.pipeline import Pipeline
# 1. Define the nodes of the workflow
reader = DataSourceNode(
id="reader_text",
table_type=TableType.DIRTABLE,
table_name="dirtable_pdf", # Replace with your dirtable name
db_uri=DB_URI,
file_types=["pdf"]
)
parser = FileParserNode(
id="parser_pdf",
file_types=FileType.PDF,
)
splitter = SplitterNode(
id="splitter_text",
max_length=1024,
level=1,
)
writer = DataSinkNode(
id="writer_summary",
table_type=TableType.TABLE,
table_name="summarization_results_quickstart",
db_uri=DB_URI,
)
# 2. Assemble the workflow
text_workflow = WorkFlow(
id="text_summarization_workflow",
name="text_summarization_pipeline",
nodes=[reader, parser, splitter, writer],
dependencies = {
"reader_text": [],
"parser_pdf": ["reader_text"],
"splitter_text": ["parser_pdf"],
"writer_summary": ["splitter_text"]
},
db_role = "gpadmin"
)
# 3. Validate the workflow
workflow_json = text_workflow.model_dump_json()
text_workflow = WorkFlow.model_validate_json(workflow_json)
text_workflow.validate_dependencies()
text_workflow.validate_parallel_structure()
print("Text workflow created and validated.")
# 4. Build and run the pipeline (optional)
# To run the pipeline, uncomment the following lines.
# This requires access to the specified data and services.
pipe = Pipeline.build_from_workflow(text_workflow)
ds = pipe.run()
ds.show()
Use image processing workflow
from synxml.flow.pipeline.workflow import *
from synxml.flow.pipeline.pipeline import Pipeline
# 1. Define nodes
vision_reader = DataSourceNode(
id="reader_vision",
table_type=TableType.DIRTABLE,
table_name="test_vision", # Replace with your dirtable name
db_uri=DB_URI,
file_types="jpg"
)
vision_parser = FileParserNode(
id="parser_jpg",
file_types=FileType.JPG,
columns_mapping = {
"input_cols": {"content": ("content", "str")},
"output_cols": {"base64": ("base64", "str")}
},
extract_reserved_columns = ["file_type"]
)
# NOTE: You need a running vision service for this node.
vision_detector = VisionNode(
id="vision_detector",
service_type="local",
task_type="detect",
end_point="http://localhost:8000/yolo/v1/detect", # Replace with your endpoint
columns_mapping = {
"input_cols": {"base64": ("base64", "str")},
"output_cols": {"vision_output": ("vision_output", "str")}
}
)
vision_writer = DataSinkNode(
id="writer_vision",
table_type=TableType.TABLE,
table_name="vision_results_quickstart", # Replace with your table name
db_uri=DB_URI,
)
# 2. Assemble workflow
vision_workflow = WorkFlow(
id="vision_workflow",
name="vision_pipeline",
nodes=[vision_reader, vision_parser, vision_detector, vision_writer]
)
# 3. Validate
workflow_json = vision_workflow.model_dump_json()
vision_workflow = WorkFlow.model_validate_json(workflow_json)
vision_workflow.validate_dependencies()
vision_workflow.validate_parallel_structure()
print("Vision workflow created and validated.")
# 4. Build and run (optional)
#from synxml.flow.pipeline.pipeline import Pipeline
pipe = Pipeline.build_from_workflow(vision_workflow)
ds = pipe.run()
ds.show()
Use speech recognition workflow
from synxml.flow.pipeline.workflow import *
from synxml.flow.pipeline.pipeline import Pipeline
# 1. Define nodes
speech_reader = DataSourceNode(
id="reader_speech",
table_type=TableType.DIRTABLE,
table_name="test_dir1", # Replace with your dirtable name
db_uri=DB_URI,
file_types=["mp3"]
)
speech_parser = FileParserNode(
id="parser_mp3",
file_types=FileType.MP3,
columns_mapping = {
"input_cols": {"content": ("content", "str")},
"output_cols": {"audio": ("audio_base64", "str")}
},
extract_reserved_columns = ["file_types"]
)
# NOTE: You need a running speech recognition service for this node.
speech_recognizer = SpeechRecognitionNode(
id="recognizer_speech",
service_type=SpeechRecognitionServiceType.API,
model_name="SenseVoiceSmall",
end_point="http://10.14.10.1:8000/sensevoice/v1/asr", # Replace with your endpoint
columns_mapping = {
"input_cols": {"audio": ("audio_base64", "str")},
"output_cols": {"text": ("text", "str")}
},
)
speech_writer = DataSinkNode(
id="writer_speech",
table_type=TableType.TABLE,
table_name="speech_results_quickstart",
db_uri=DB_URI,
)
# 2. Assemble workflow
speech_workflow = WorkFlow(
id="speech_workflow",
name="speech_pipeline",
nodes=[speech_reader, speech_parser, speech_recognizer, speech_writer],
dependencies = {
"reader_speech": [],
"parser_mp3": ["reader_speech"],
"recognizer_speech": ["parser_mp3"],
"writer_speech": ["recognizer_speech"]
},
db_role = "gpadmin"
)
# 3. Validate
workflow_json = speech_workflow.model_dump_json()
speech_workflow = WorkFlow.model_validate_json(workflow_json)
speech_workflow.validate_dependencies()
speech_workflow.validate_parallel_structure()
print("Speech workflow created and validated.")
# 4. Build and run (optional)
# from synxml.flow.pipeline.pipeline import Pipeline
pipe = Pipeline.build_from_workflow(speech_workflow)
ds = pipe.run()
ds.show()
Use RAG
includes features for building RAG applications.
Use text embeddings
from synxml.reqa import TextEmbedder
embedder = TextEmbedder(
"/tmp/models/jina-embeddings-v2-base-zh", # replace to your model path
max_seq_length=512,
use_gpu=False
)
embedder.embed("Hi")
Use multi-modal embeddings
from synxml.reqa import MultimodalEmbedder
multi_embedder = MultimodalEmbedder(
"/tmp/models/bge-vl-base", # replace to your model path
use_gpu=False
)
multi_embedder.embed([{"text": "Hi", "image": "iVBORw0KGgoAAAANSUhEUgAAAAoAAAAKCAIAAAACUFjqAAAAE0lEQVR4nGP8z4APMOGVZRip0gBBLAETee26JgAAAABJRU5ErkJggg=="}])
Use ReRanking
from synxml.reqa import ReRanker
reranker = ReRanker(
"/tmp/models/jina-embeddings-v2-base-zh", # replace to your model path
max_seq_length=512,
use_gpu=False
)
reranker.rank("Hi", ["Hello", "Hi", "Goodbye"])
Use vector knowledge base
from synxml.reqa import DocVKB
from synxml.client import EmbeddingAPI
embedder = EmbeddingAPI(
"http://10.14.10.1:8000/embedding/v1/embeddings", # replace with your own endpoint
"jina-embeddings-v2-base-zh" # replace with your own model name
)
vkb = DocVKB(
DB_URI,
kb_table_name="kb_0fa280128054413aaf47adcc0f5cb8ff", # replace with your own table name
embedder=embedder,
)
vkb.embedding_search("Hi")
Best practices and tips
Follow these best practices for optimal results with .
Use data management best practices
✓ Always validate data quality before training.
✓ Use appropriate data types for columns.
✓ Implement proper train/test splits.
✓ Handle missing values appropriately.
✓ Normalize/scale numerical features.
✓ Encode categorical variables properly.
✓ Monitor data drift in production.
✓ Use version control for datasets.
✓ Document data sources and transformations.
✓ Implement data privacy and security measures.
Model training tips
✓ Start with simple models before complex ones.
✓ Use cross-validation for robust evaluation.
✓ Tune hyperparameters systematically.
✓ Monitor for overfitting and underfitting.
✓ Use appropriate evaluation metrics.
✓ Validate model assumptions.
✓ Consider ensemble methods.
✓ Document model configurations.
✓ Save model artifacts properly
✓ Test model inference performance