From cc92808eba7cc3f623bc6fc76de7cc064d1ccf3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E7=85=A7=E6=B5=B7?= <14521282+sunzhaohai@user.noreply.gitee.com> Date: Fri, 23 Jan 2026 07:42:38 +0000 Subject: [PATCH] add src/main/java/edu/buaa/batch/TGQLBulkLoad.java. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 孙照海 <14521282+sunzhaohai@user.noreply.gitee.com> --- .../java/edu/buaa/batch/TGQLBulkLoad.java | 262 ++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 src/main/java/edu/buaa/batch/TGQLBulkLoad.java diff --git a/src/main/java/edu/buaa/batch/TGQLBulkLoad.java b/src/main/java/edu/buaa/batch/TGQLBulkLoad.java new file mode 100644 index 00000000..4728655e --- /dev/null +++ b/src/main/java/edu/buaa/batch/TGQLBulkLoad.java @@ -0,0 +1,262 @@ +package edu.buaa.batch; + +import edu.buaa.common.benchmark.MilestoneBuilder; +import edu.buaa.common.transaction.ImportStaticDataTx; +import edu.buaa.common.utils.PFieldList; +import edu.buaa.utils.Helper; + + +import org.neo4j.batchinsert.BatchInserter; +import org.neo4j.batchinsert.BatchInserters; +import org.neo4j.configuration.Config; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.dbms.api.DatabaseManagementService; +import org.neo4j.dbms.api.DatabaseManagementServiceBuilder; +import org.neo4j.graphdb.*; + +import org.neo4j.io.layout.DatabaseLayout; +import org.neo4j.io.layout.Neo4jLayout; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.util.*; + + +public class TGQLBulkLoad extends MilestoneBuilder { + private static final Label OBJECT_LABEL = Label.label("Object"); + private static final Label ATTRIBUTE_LABEL = Label.label("Attribute"); + private static final Label VALUE_LABEL = Label.label("Value"); + private static final RelationshipType RELATIONSHIP_TYPE_STATIC = RelationshipType.withName("static"); + private static final RelationshipType RELATIONSHIP_TYPE_TEMPORAL = RelationshipType.withName("temporal"); + + private static final RelationshipType HAS_ATTRIBUTE = RelationshipType.withName("HAS_ATTRIBUTE"); // 对象与属性用此类型关系连接 + private static final RelationshipType HAS_VALUE = RelationshipType.withName("HAS_VALUE"); // 属性与属性值用此类型关系连接 + + protected final File dbDir; + private DatabaseLayout databaseLayout = null; + protected final Map nMap = new HashMap<>(); // 原始节点ID(String) -> Neo4j生成的内部节点ID(Long) + protected final Map rMap = new HashMap<>(); // 原始关系ID(String) -> Neo4j生成的内部关系ID(Long) + protected final Map rFromMap = new HashMap<>(); // 原始关系ID(String) -> 起点内部节点ID(Long) + protected final Map rToMap = new HashMap<>(); // 原始关系ID(String) -> 终点内部节点ID(Long) + protected final Map nAttrMap = new HashMap<>(); // 节点外部ID + 属性名 → 属性节点ID + + + public TGQLBulkLoad() throws Exception { + super(); + this.dbDir = new File(Helper.mustEnv("DB_PATH")); + System.out.println("DB dir: " + dbDir); + } + + @Override + public void close() throws Exception { + } + + private DatabaseLayout getDefaultDatabaseLayout() { + if (databaseLayout == null) { + Config.Builder configBuilder = Config.newBuilder(); + configBuilder.set(GraphDatabaseSettings.neo4j_home, dbDir.toPath().toAbsolutePath()); + Neo4jLayout layout = Neo4jLayout.of(configBuilder.build()); + databaseLayout = layout.databaseLayout("neo4j"); + } + return databaseLayout; + } + + @Override + public void importStatic() throws Exception{ + BatchInserter bulkDB = BatchInserters.inserter(getDefaultDatabaseLayout()); + Iterator it = dataGen.readNetwork(8000); + while(it.hasNext()) { + ImportStaticDataTx tx = it.next(); + // --- 处理节点 --- + PFieldList nodesData = tx.getNodes(); + int nSize = nodesData.size(); + for (int i = 0; i < nSize; i++) { + String sid = nodesData.get("u_sid", i).s(); + long id = bulkDB.createNode(props(nodesData, i), OBJECT_LABEL); + nMap.put(sid, id); + } + // --- 处理边 --- + PFieldList relData = tx.getRels(); + int rSize = relData.size(); + for (int i = 0; i < rSize; i++) { + String sid = relData.get("u_sid", i).s(); + String fromId = relData.get("r_from", i).s(); + String toId = relData.get("r_to", i).s(); + Long s = nMap.get(fromId); + Long e = nMap.get(toId); + long id = bulkDB.createRelationship(s, e, RELATIONSHIP_TYPE_STATIC, props(relData, i)); + rMap.put(sid, id); + rFromMap.put(sid, s); + rToMap.put(sid, e); + } + } + // 关闭 BatchInserter,数据落盘 + bulkDB.shutdown(); + // 创建索引 + DatabaseManagementService dbms = new DatabaseManagementServiceBuilder(dbDir.toPath()).build(); + GraphDatabaseService db = dbms.database("neo4j"); + try (Transaction tx = db.beginTx()) { + // 为节点和关系的 u_sid 创建索引 + tx.schema().indexFor(OBJECT_LABEL).on("u_sid").create(); + tx.schema().indexFor(RELATIONSHIP_TYPE_STATIC).on("u_sid").create(); + tx.commit(); + } + dbms.shutdown(); + } + + private Map props(PFieldList data, int i) { + Map p = new HashMap<>(); + for (String key : data.keys()) { + p.put(key, data.get(key, i).getVal()); + } + return p; + } + + @Override + public void importTemporal() throws Exception { + File ntp = dataGen.prepareTPCSV(dataSize, startTime, endTime, true, true, false); + BatchInserter nodeBulkDB = BatchInserters.inserter(getDefaultDatabaseLayout()); // 以批量插入模式打开数据库 + loadNodeTemporalFromCsv(ntp, nodeBulkDB); + nodeBulkDB.shutdown(); + + File rtp = dataGen.prepareTPCSV(dataSize, startTime, endTime, false, true, false); + BatchInserter edgeBulkDB = BatchInserters.inserter(getDefaultDatabaseLayout()); + loadEdgeTemporalFromCsv(rtp, edgeBulkDB); + edgeBulkDB.shutdown(); + } + + private void loadNodeTemporalFromCsv(File ntp, BatchInserter bulkDB) throws Exception { + try (BufferedReader reader = new BufferedReader(new FileReader(ntp))) { + String headerLine = reader.readLine(); + if (headerLine == null) { + return; + } + String[] header = headerLine.split(","); + int stIdx = 0; + int etIdx = 1; + int entityIdx = 2; + List propNames = new ArrayList<>(); + for (int i = 3; i < header.length; i++) { + propNames.add(header[i]); + } + + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + continue; + } + String[] arr = line.split(",", -1); + if (arr.length < 3) { + continue; + } + int st = Integer.parseInt(arr[stIdx]); + int et = Integer.parseInt(arr[etIdx]); + String entitySid = arr[entityIdx]; + Long objId = nMap.get(entitySid); + if (objId == null) { + throw new IllegalStateException("node not found. u_sid: " + entitySid); + } + + for (int i = 0; i < propNames.size(); i++) { + int colIdx = 3 + i; + String prop = propNames.get(i); + String raw = arr[colIdx]; + Object value = parseValue(raw); + + Long attrId = nAttrMap.get(attrKey(entitySid, prop)); + if (attrId == null) { + Map attrProps = new HashMap<>(); + attrProps.put("title", prop); + attrId = bulkDB.createNode(attrProps, ATTRIBUTE_LABEL); + bulkDB.createRelationship(objId, attrId, HAS_ATTRIBUTE, Collections.emptyMap()); + nAttrMap.put(attrKey(entitySid, prop), attrId); + } + + Map valueProps = new HashMap<>(); + valueProps.put("value", value); + valueProps.put("start_time", st); + valueProps.put("end_time", et); + long valueId = bulkDB.createNode(valueProps, VALUE_LABEL); + bulkDB.createRelationship(attrId, valueId, HAS_VALUE, Collections.emptyMap()); + } + } + } + } + + private Object parseValue(String raw) { + if (raw == null || raw.isEmpty()) { + return ""; + } + try { + return Integer.parseInt(raw); + } catch (NumberFormatException ignored) { + } + try { + return Float.parseFloat(raw); + } catch (NumberFormatException ignored) { + } + return raw; + } + + private void loadEdgeTemporalFromCsv(File rtp, BatchInserter bulkDB) throws Exception { + try (BufferedReader reader = new BufferedReader(new FileReader(rtp))) { + String headerLine = reader.readLine(); + if (headerLine == null) { + return; + } + String[] header = headerLine.split(","); + int stIdx = 0; + int etIdx = 1; + int entityIdx = 2; + List propNames = new ArrayList<>(); + for (int i = 3; i < header.length; i++) { + propNames.add(header[i]); + } + + String line; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + continue; + } + String[] arr = line.split(",", -1); + if (arr.length < 3) { + continue; + } + int st = Integer.parseInt(arr[stIdx]); + int et = Integer.parseInt(arr[etIdx]); + String relSid = arr[entityIdx]; + + Long from = rFromMap.get(relSid); + Long to = rToMap.get(relSid); + if (from == null || to == null) { + throw new IllegalStateException("edge endpoints not found. u_sid: " + relSid); + } + + Map relProps = new HashMap<>(); + relProps.put("u_sid", relSid); + relProps.put("start_time", st); + relProps.put("end_time", et); + + + for (int i = 0; i < propNames.size(); i++) { + int colIdx = 3 + i; + if (colIdx >= arr.length) { + continue; + } + String prop = propNames.get(i); + String raw = arr[colIdx]; + Object value = parseValue(raw); + relProps.put(prop, value); + } + + bulkDB.createRelationship(from, to, RELATIONSHIP_TYPE_TEMPORAL, relProps); + } + } + } + + private String attrKey(String uSid, String prop) { + return uSid + "|" + prop; + } +} + -- Gitee