Giter Site home page Giter Site logo

Comments (16)

6fj avatar 6fj commented on June 14, 2024 1

Hi @mingo0117 ,

首先,你需要通过设置spu的config来开启相应的log:

然后,你需要在secretflow init的时候打开log_to_driver,类似于

import secretflow as sf

sf.init(['alice', 'bob'], num_cpus=8, log_to_driver=True)

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

以上是semi2k协议的,我后来换成aby3又提示:

Traceback (most recent call last):
  File "/home/ops/anaconda3/envs/secretflow/bin/kernprof", line 8, in <module>
    sys.exit(main())
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 220, in main
    execfile(script_file, ns, ns)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 28, in execfile
    exec(compile(f.read(), filename, 'exec'), globals, locals)
  File "test_cypher_logistic_gegression_3pc.py", line 167, in <module>
    test()
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/line_profiler/line_profiler.py", line 110, in wrapper
    result = func(*args, **kwds)
  File "test_cypher_logistic_gegression_3pc.py", line 153, in test
    losses = sf.reveal(losses)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/driver.py", line 158, in reveal
    value_obj = ray.get(value_ref)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/worker.py", line 1843, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.run() (pid=32328, ip=10.100.82.134, repr=<secretflow.device.device.spu.SPURuntime object at 0x7f3288065220>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 224, in run
    self.runtime.run(executable)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/spu/binding/api.py", line 43, in run
    return self._vm.Run(executable.SerializeToString())
MemoryError: std::bad_alloc

目前用的3个计算节点,都是8c16g

from secretflow.

6fj avatar 6fj commented on June 14, 2024

Hi @mingo0117 ,

  1. 请问你目前的机器资源是怎样的?
  2. 如果数据集减小的话,能否成功执行呢?

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

Hi @mingo0117 ,

  1. 请问你目前的机器资源是怎样的?
  2. 如果数据集减小的话,能否成功执行呢?

ray 集群,3台机器,都是8c 16g
数据集调小后报错:

Traceback (most recent call last):
  File "/home/ops/anaconda3/envs/secretflow/bin/kernprof", line 8, in <module>
    sys.exit(main())
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 220, in main
    execfile(script_file, ns, ns)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 28, in execfile
    exec(compile(f.read(), filename, 'exec'), globals, locals)
  File "test_cypher_logistic_gegression_3.py", line 152, in <module>
    test()
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/line_profiler/line_profiler.py", line 110, in wrapper
    result = func(*args, **kwds)
  File "test_cypher_logistic_gegression_3.py", line 139, in test
    losses = sf.reveal(losses)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/driver.py", line 158, in reveal
    value_obj = ray.get(value_ref)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/worker.py", line 1845, in get
    raise value
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::SPURuntime.__init__() (pid=4100, ip=10.100.82.173, repr=<secretflow.device.device.spu.SPURuntime object at 0x7fed51ed8400>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 125, in __init__
    self.link = link.create_brpc(desc, rank)
RuntimeError: what: 
        [external/yasl/yasl/link/context.cc:140] connect to mesh failed, failed to setup connection to rank=1
stacktrace: 
#0 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7feda8c28ed7
#1 pybind11::cpp_function::dispatcher()+0x7feda8c150cb
#2 cfunction_call_varargs+0x561b974ea00e

from secretflow.

6fj avatar 6fj commented on June 14, 2024

Hi @mingo0117 ,

看上去SPU device没有建立成功,但你之前应该是成功的,现在请确保

  • 所有ray的节点已经被clean
  • 自行提供spu的config,并确保端口没有被占用。

from secretflow.

6fj avatar 6fj commented on June 14, 2024

请参考 #66

from secretflow.

6fj avatar 6fj commented on June 14, 2024

Hi @mingo0117

请试着用以下方式强制重建spu device:

sf.shutdown()

sf.init(['alice', 'bob', 'carol'], num_cpus=8, log_to_driver=False)

alice, bob, carol = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('carol')
spu_3pc = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob', 'carol']))

感谢。

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

Hi @mingo0117

请试着用以下方式强制重建spu device:

sf.shutdown()

sf.init(['alice', 'bob', 'carol'], num_cpus=8, log_to_driver=False)

alice, bob, carol = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('carol')
spu_3pc = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob', 'carol']))

感谢。

嗯,spu device重建的问题不存在了,我还是想聊下最开始说的这个。我自己解决了,很有意思,但是不明白为什么:

2022-07-28 16:16:13,219 ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::SPURuntime.run() (pid=13081, ip=10.100.82.74, repr=<secretflow.device.device.spu.SPURuntime object at 0x7f1fd47b1220>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 224, in run
    self.runtime.run(executable)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/spu/binding/api.py", line 43, in run
    return self._vm.Run(executable.SerializeToString())
RuntimeError: what: 
        [external/yasl/yasl/link/transport/channel.cc:86] Get data timeout, key=root:110:ALLGATHER
stacktrace: 
#0 yasl::link::Context::RecvInternal()+0x7f202eb100b2
#1 yasl::link::AllGatherImpl<>()+0x7f202e9c8785
#2 yasl::link::AllGather()+0x7f202e9c8cb4
#3 spu::mpc::Communicator::allReduce()+0x7f202e2c7a37
#4 spu::mpc::semi2k::B2A_Randbit::proc()::{lambda()#1}::operator()()::{lambda()#3}::operator()()+0x7f202e2bd9f2
#5 spu::mpc::semi2k::B2A_Randbit::proc()+0x7f202e2c0a89
#6 spu::mpc::UnaryKernel::evaluate()+0x7f202e19efdb
#7 spu::mpc::Object::call<>()+0x7f202e2c60b8
#8 spu::mpc::(anonymous namespace)::_Lazy2A()+0x7f202e2dfb19
#9 spu::mpc::ABProtAddSP::proc()+0x7f202e2e019b
#10 spu::mpc::BinaryKernel::evaluate()+0x7f202e19f2f2
#11 spu::mpc::Object::call<>()+0x7f202e2c6866
#12 spu::mpc::add_sp()+0x7f202e2c6994
#13 spu::hal::_add_sp()+0x7f202e171b63
#14 spu::hal::_add()+0x7f202e167486
#15 spu::hal::_popcount()+0x7f202e168b8c

这个只要把数据源改成官网的案例,就没问题:
features, label = load_breast_cancer(return_X_y=True)
但我想用自己的数据源做LR,于是我写的是:

features = np.array(pd.read_csv(r'{0}'.format(x_csv_path), sep=',', usecols=range(20)))
label = np.array(pd.read_csv(r'{0}'.format(y_csv_path)))

然后神奇的事情发生了,数据一直在跑,机器CPU、内存都打满了,最后抛出Get data timeout的报错,更换协议也只是换了一个内存的报错。似乎是pandas读取的问题。后来我看load_breast_cancer源码,没有用pandas,于是我还是换成了np去读取:

features = np.loadtxt(open(x_csv_path, "rb"), delimiter=",", usecols=range(20))
label = np.loadtxt(open(y_csv_path, "rb"))

问题解决了... 30多秒出了结果。这是为什么呢?pandas这种读取,我用明文的方式做lr是完全可以的,所以一直没有怀疑到这个读取方式上

from secretflow.

6fj avatar 6fj commented on June 14, 2024

很有趣的现象,可以发一下完整的代码/复现过程吗?感谢!

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024
# ******* logistic regression util start*************************

import timeit

import jax.numpy as jnp
import matplotlib.pyplot as plt
import numpy as np
from jax import jit
from jax import value_and_grad
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


def load_train_dataset(x_csv_path, y_csv_path, party_id=None) -> (np.ndarray, np.ndarray):
    #features = np.array(pd.read_csv(r'{0}'.format(x_csv_path), sep=',', usecols=range(20)))
    #label = np.array(pd.read_csv(r'{0}'.format(y_csv_path)))
    features = np.loadtxt(open(x_csv_path, "rb"), delimiter=",", usecols=range(20))
    label = np.loadtxt(open(y_csv_path, "rb"))
    # features, label = load_breast_cancer(return_X_y=True)
    scaler = StandardScaler()
    features = scaler.fit_transform(features)
    X_train, _, y_train, _ = train_test_split(
        features, label, test_size=0.2, random_state=42
    )

    if party_id:
        if party_id == 1:
            return X_train[:, 0:6], _
        if party_id == 2:
            return X_train[:, 6:13], _
        else:
            return X_train[:, 13:20], y_train
    else:
        return X_train, y_train


def load_test_dataset(x_csv_path, y_csv_path):
    #features = np.array(pd.read_csv(r'{0}'.format(x_csv_path), sep=',', usecols=range(20)))
    #label = np.array(pd.read_csv(r'{0}'.format(y_csv_path)))
    features = np.loadtxt(open(x_csv_path, "rb"), delimiter=",", usecols=range(20))
    label = np.loadtxt(open(y_csv_path, "rb"))
    # features, label = load_breast_cancer(return_X_y=True)
    scaler = StandardScaler()
    features = scaler.fit_transform(features)
    _, X_test, _, y_test = train_test_split(
        features, label, test_size=0.2, random_state=42
    )
    return X_test, y_test


def sigmoid(x):
    return 1 / (1 + jnp.exp(-x))


# Outputs probability of a label being true.
def predict(W, b, inputs):
    return sigmoid(jnp.dot(inputs, W) + b)


# Training loss is the negative log-likelihood of the training examples.
def loss(W, b, inputs, targets):
    preds = predict(W, b, inputs)
    label_probs = preds * targets + (1 - preds) * (1 - targets)
    return -jnp.mean(jnp.log(label_probs))

@jit
def train_step(W, b, x1, x2, x3, y, learning_rate):
    x = jnp.concatenate([x1, x2, x3], axis=1)
    # x = jnp.hstack([x1, x2, x3])
    # print(x)
    loss_value, Wb_grad = value_and_grad(loss, (0, 1))(W, b, x, y)
    W -= learning_rate * Wb_grad[0]
    b -= learning_rate * Wb_grad[1]
    return loss_value, W, b


def fit(W, b, x1, x2, x3, y, epochs=1, learning_rate=1e-2):
    losses = jnp.array([])
    for _ in range(epochs):
        l, W, b = train_step(W, b, x1, x2, x3, y, learning_rate=learning_rate)
        losses = jnp.append(losses, l)
    return losses, W, b


def validate_model(W, b, X_test, y_test):
    y_pred = predict(W, b, X_test)
    return roc_auc_score(y_test, y_pred)


def plot(losses):
    # 创建Figure对象
    plt.figure(figsize=(20, 12))
    # plt.subplot(1, 2, 1)
    plt.plot(np.arange(len(losses)), losses)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.title("Loss Graph")
    plt.savefig("cypher_logistic_gegression_loss.jpg")


ray_cluster_address = '10.100.82.173:6379'
x_csv_path = '/home/ops/svm_two_class_train_dense_data.csv'
y_csv_path = '/home/ops/svm_two_class_train_dense_label.csv'
learn_rate = 0.001
epochs = 5


# ******* logistic_regression_util end *************************

def test():
    global alice, bob, charlie, spu, device, losses
    import secretflow as sf
    import spu as sp
    # In case you have a running secretflow runtime already.
    sf.shutdown()
    # Standalone
    # sf.init(['alice', 'bob'], num_cpus=8, log_to_driver=True)
    # Cluster
    sf.init(address=ray_cluster_address)
    alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
    # Standalone
    # spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
    # Cluster
    spu = sf.SPU({
        'nodes': [{'party': 'alice',
                   'id': '0',
                   'address': '10.100.82.173:18888', },
                  {'party': 'bob',
                   'id': '1',
                   'address': '10.100.82.74:18888', },
                  {'party': 'charlie',
                   'id': '2',
                   'address': '10.100.82.134:18888', }],
        'runtime_config': {
            'protocol': sp.spu_pb2.SEMI2K,
            'field': sp.spu_pb2.FM128,
        },
    })
    x1, _ = alice(load_train_dataset)(x_csv_path, y_csv_path, party_id=1)
    x2, _ = bob(load_train_dataset)(x_csv_path, y_csv_path, party_id=2)
    x3, y = charlie(load_train_dataset)(x_csv_path, y_csv_path, party_id=3)
    device = spu
    W = jnp.zeros((20,))
    b = 0.0
    W_, b_, x1_, x2_, x3_, y_ = (
        sf.to(device, W),
        sf.to(device, b),
        x1.to(device),
        x2.to(device),
        x3.to(device),
        y.to(device),
    )
    losses, W_, b_ = device(fit, static_argnames=['epochs'], num_returns=3)(
        W_, b_, x1_, x2_, x3_, y_, epochs=epochs, learning_rate=learn_rate
    )
    losses = sf.reveal(losses)
    W = sf.reveal(W_)
    b = sf.reveal(b_)
    print(losses)
    print("losses={0}".format(losses))
    # print("W={0}".format(W))
    # print("b={0}".format(b))
    X_test, y_test = load_test_dataset(x_csv_path, y_csv_path)
    auc = validate_model(W, b, X_test, y_test)
    print(f'auc={auc}')


print("耗时:{0}秒".format(timeit.timeit('test()', setup='from __main__ import test', number=1)))

以上

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

看 load_train_dataset 这里就行,区别就是注释的地方,您可以随便找个20列的测试集在三方集群试试

from secretflow.

longshan-ant avatar longshan-ant commented on June 14, 2024

方便用np.array_equal判断一下np.loadtxt和pd.read_csv两个函数读取的数据有没有差异?

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

image
有。pd.read_csv,我这种写法,第一行漏读了, 没有加header=None

from secretflow.

6fj avatar 6fj commented on June 14, 2024

Hi @mingo0117

As far as we know, serialization is costly in ray. So please use numpy IO API at most time. Please refer to https://docs.ray.io/en/releases-1.11.1/ray-core/serialization.html#serialization.

If you have to use Pandas for IO purpose, please check https://docs.ray.io/en/latest/data/modin/index.html#using-pandas-on-ray-modin as well.

Thanks.

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

Hi @mingo0117

As far as we know, serialization is costly in ray. So please use numpy IO API at most time. Please refer to https://docs.ray.io/en/releases-1.11.1/ray-core/serialization.html#serialization.

If you have to use Pandas for IO purpose, please check https://docs.ray.io/en/latest/data/modin/index.html#using-pandas-on-ray-modin as well.

Thanks.

谢谢。再请教一个问题:spu是否有协议或者计算相关的日志?如何打开?想看看里面发生了什么

from secretflow.

mingo0117 avatar mingo0117 commented on June 14, 2024

Hi @mingo0117 ,

首先,你需要通过设置spu的config来开启相应的log:

然后,你需要在secretflow init的时候打开log_to_driver,类似于

import secretflow as sf

sf.init(['alice', 'bob'], num_cpus=8, log_to_driver=True)

明白了,非常感谢

from secretflow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.