# Torlink **Repository Path**: iscas-ssg/torlink ## Basic Information - **Project Name**: Torlink - **Description**: 一个面向高速流式数据的机器学习系统 - **Primary Language**: Unknown - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-06-13 - **Last Updated**: 2024-06-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 系统介绍 Torlink是一个面向高速流式数据的机器学习系统,支持利用大规模流式数据训练基于梯度下降法的算法模型。Torlink基于Apache Flink以及PyTorch进行扩展,拥有强大的并行化处理能力同时提供了丰富的算法模型,此外,Torlink还集成了额外三种优化方法与机制,使得其性能有了进一步提高。 基于SUSY与CoverType数据的测试显示,在保证模型精度的情况下,Torlink在4节点集群上,总体吞吐率最高可达Flink ML的 4.1 倍,同时加速比可达3.3。 # 构建与安装 Torlink分为两部分,分别是[Torlink-java](./torlink-java/)与[Torlink-py](./torlink-py/),需要对两个部分分别进行构建与安装。 **前置要求** - Unix-like环境(Linux, Mac OS X, Cygwin, WSL) - Maven 3及以上 - JDK 11及以上 - Python 3.9及以上 构建与安装Torlink-java ```bash cd torlink-java && mvn clean install -DskipTests && cd - ``` Torlink-java现在位于build-target 构建与安装Torlink-py ```bash cd torlink-py python setup.py build python setup.py install cd - ``` Torlink-py现在位于当前python环境 # 系统使用 使用Torlink需要分别编写Torlink-java与Torlink-py程序,然后分别启动Flink任务与PyTorch任务。 - Torlink-java 下面是一个Torlink-java程序例子,该样例生成一批18维的2分类样本,再对样本进行动态平衡采样,然后对样本进行最大最小归一化,最后将预处理完成的数据输出到Kafka消息队列中。 ```java public static void main(String[] args) throws Exception { KafkaUtil.prepareTopic(brokerAddr, topicName, parallelism); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); int numFeatures = 18; DataGenSource source = new DataGenSource( numFeatures, allSamples / parallelism, 2, false, secondSpeed, false, null, null, -1); DataStream trainData = env.addSource(source).name("gen_data_source"); if (initSampleRatio > 0) { SamplerConf conf = new SamplerConf( redisAddr, redisPort, 0.1, 0.98, 1000, "source_sp", "ml_sp", "sample_ratio"); if (samplerType.equals("coreset")) { CoresetSampler sampler = new CoresetSampler(numFeatures, 80, initSampleRatio, 2, conf, true); trainData = trainData.process(sampler); } else { RandomSampler sampler = new RandomSampler(initSampleRatio, conf, true); trainData = trainData.process(sampler); } } int[] featureIndices = new int[numFeatures]; for (int i = 0; i < numFeatures; i++) { featureIndices[i] = i; } MinMaxScaler scaler = new MinMaxScaler(4000, featureIndices, redisAddr, redisPort, "torlink_minmax"); trainData = trainData.process(scaler); KafkaSink sink = KafkaSink.builder() .setBootstrapServers(brokerAddr) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(topicName) .setPartitioner(new FlinkFixedPartitioner<>()) .setValueSerializationSchema(new DenseVectorSerialization()) .build()) .build(); trainData.sinkTo(sink).name("kafka_sink"); env.execute("torlink lr"); } ``` 对如上样例使用Maven进行构建,然后按照标准Flink应用进行提交运行即可。 - Torlink-py 如下是一个Torlink-py样例,该样例从Kafka中读取预处理完成的数据,然后使用[PyTorch DDP](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)分布式地训练一个LogisticRegression模型,同时定期对模型参数进行快照并汇报当前模型训练速度。 ```bash def run(): args = arg_parser().parse_args() bootstrap_servers = args.bootstrap_servers topic = args.topic batch_size = args.batch_size window_size = args.window_size parallesim = args.parallesim ck_interval = args.ck_interval num_workers = 1 num_features = 18 setup_env() world_size = dist.get_world_size() rank = dist.get_rank() # 连接数据通道 kafka_dataset = KafkaDataSet( topic=topic, bootstrap_servers=bootstrap_servers, partitions=parallesim, world_size=world_size, rank=rank, ) loader = DataLoader( kafka_dataset, batch_size=batch_size, num_workers=num_workers, worker_init_fn=worker_init_fn1, pin_memory=True, prefetch_factor=2, drop_last=True, ) # 定义模型 net = LogisticRegression(num_features, 1) ddp_net = Ddp(net) loss_fn = torch.nn.BCELoss() optimizer = torch.optim.SGD(ddp_net.parameters(), lr=1e-3) # 模型训练 trainer = Trainer( ddp_net, loss_fn, optimizer, loader, window_type="count", window_size=window_size, window_threshold=batch_size, rank=rank, redis_addr="redis-ip", redis_port=6379, log_interval=10, ck_interval=ck_interval, ck_key="model_ck", report_speed_interval=1, speed_key="ml_sp", ) trainer.train() cleanup_env() ``` 对以上样例程序,使用[torchrun](https://pytorch.org/docs/stable/elastic/quickstart.html)运行。 两者开始运行后,即可组成一个完成的流式机器学习应用。 # 开源协议 本项目使用第2版木兰宽松许可证。