diff --git a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdValueList.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdValueList.java index 99b9a26db..68a603523 100644 --- a/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdValueList.java +++ b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/graph/value/IdValueList.java @@ -44,4 +44,13 @@ public void read(RandomAccessInput in) throws IOException { public void write(RandomAccessOutput out) throws IOException { this.write(out, false); } + + @Override + public IdValueList copy() { + IdValueList values = new IdValueList(); + for (IdValue value : this.values()) { + values.add(value); + } + return values; + } } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/ComputerOutput.java b/computer-api/src/main/java/com/baidu/hugegraph/computer/core/output/ComputerOutput.java similarity index 100% rename from computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/ComputerOutput.java rename to computer-api/src/main/java/com/baidu/hugegraph/computer/core/output/ComputerOutput.java diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManager.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManager.java new file mode 100644 index 000000000..aea8cc6d7 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManager.java @@ -0,0 +1,140 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.computer.core.common.ComputerContext; +import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat; +import com.baidu.hugegraph.computer.core.graph.value.Value; +import com.baidu.hugegraph.computer.core.manager.Managers; +import com.baidu.hugegraph.computer.core.network.message.MessageType; +import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager; +import com.baidu.hugegraph.computer.core.receiver.RecvMessageStat; +import com.baidu.hugegraph.computer.core.sender.MessageSendManager; +import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry; +import com.baidu.hugegraph.computer.core.worker.Computation; +import com.baidu.hugegraph.computer.core.worker.ComputationContext; +import com.baidu.hugegraph.computer.core.worker.WorkerStat; +import com.baidu.hugegraph.util.Log; + +public class ComputeManager> { + + private static final Logger LOG = Log.logger(ComputeManager.class); + + private final ComputerContext context; + private final Managers managers; + + private final Map> partitions; + private final Computation computation; + private final MessageRecvManager recvManager; + private final MessageSendManager sendManager; + + public ComputeManager(ComputerContext context, Managers managers, + Computation computation) { + this.context = context; + this.managers = managers; + this.computation = computation; + this.partitions = new HashMap<>(); + this.recvManager = this.managers.get(MessageRecvManager.NAME); + this.sendManager = this.managers.get(MessageSendManager.NAME); + } + + public WorkerStat input() { + WorkerStat workerStat = new WorkerStat(); + this.recvManager.waitReceivedAllMessages(); + Map> vertices; + vertices = this.recvManager.vertexPartitions(); + Map> edges; + edges = this.recvManager.edgePartitions(); + // TODO: parallel input process + for (Map.Entry> entry : + vertices.entrySet()) { + FileGraphPartition partition = new FileGraphPartition<>( + this.context, this.managers, + entry.getKey()); + PartitionStat partitionStat = partition.init(entry.getValue(), + edges.get(entry.getKey())); + workerStat.add(partitionStat); + this.partitions.put(entry.getKey(), partition); + } + return workerStat; + } + + /** + * Get compute-messages from MessageRecvManager, then put message to + * corresponding partition. Be called before + * {@link MessageRecvManager#beforeSuperstep} is called. + */ + public void takeComputeMessages() { + Map> messages = + this.recvManager.messagePartitions(); + for (FileGraphPartition partition : this.partitions.values()) { + partition.messages(messages.get(partition.partition())); + } + } + + public WorkerStat compute(ComputationContext context, int superstep) { + this.sendManager.startSend(MessageType.MSG); + WorkerStat workerStat = new WorkerStat(); + Map partitionStats = new HashMap<>( + this.partitions.size()); + if (superstep == 0) { + // TODO: parallel compute process. + for (FileGraphPartition partition : this.partitions.values()) { + PartitionStat stat = partition.compute0(context, + this.computation); + partitionStats.put(stat.partitionId(), stat); + } + } else { + // TODO: parallel compute process. + for (FileGraphPartition partition : this.partitions.values()) { + PartitionStat stat = partition.compute(context, + this.computation, + superstep); + partitionStats.put(stat.partitionId(), stat); + } + } + this.sendManager.finishSend(MessageType.MSG); + // After compute and send finish signal. + Map recvStats = + this.recvManager.recvMessageStats(); + for (Map.Entry entry : + partitionStats.entrySet()) { + PartitionStat partitionStat = entry.getValue(); + partitionStat.merge(recvStats.get(partitionStat.partitionId())); + workerStat.add(partitionStat); + } + return workerStat; + } + + public void output() { + // TODO: Write results back parallel + for (FileGraphPartition partition : this.partitions.values()) { + PartitionStat stat = partition.output(); + LOG.info("Output partition {} complete, stat='{}'", + partition.partition(), stat); + } + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/FileGraphPartition.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/FileGraphPartition.java new file mode 100644 index 000000000..53708d11b --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/FileGraphPartition.java @@ -0,0 +1,393 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import com.baidu.hugegraph.computer.core.common.ComputerContext; +import com.baidu.hugegraph.computer.core.common.Constants; +import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.compute.input.EdgesInput; +import com.baidu.hugegraph.computer.core.compute.input.MessageInput; +import com.baidu.hugegraph.computer.core.compute.input.VertexInput; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.graph.edge.Edges; +import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat; +import com.baidu.hugegraph.computer.core.graph.value.Value; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.computer.core.io.BufferedFileInput; +import com.baidu.hugegraph.computer.core.io.BufferedFileOutput; +import com.baidu.hugegraph.computer.core.manager.Managers; +import com.baidu.hugegraph.computer.core.output.ComputerOutput; +import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator; +import com.baidu.hugegraph.computer.core.store.FileGenerator; +import com.baidu.hugegraph.computer.core.store.FileManager; +import com.baidu.hugegraph.computer.core.store.hgkvfile.buffer.EntryIterator; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntriesUtil; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.Pointer; +import com.baidu.hugegraph.computer.core.worker.Computation; +import com.baidu.hugegraph.computer.core.worker.ComputationContext; +import com.baidu.hugegraph.util.E; + +public class FileGraphPartition> { + + private static final String VERTEX = "vertex"; + private static final String EDGE = "edge"; + private static final String STATUS = "status"; + private static final String VALUE = "value"; + + private final ComputerContext context; + private final FileGenerator fileGenerator; + private final int partition; + private final File vertexFile; + private final File edgeFile; + + private final Value result; + + private File preStatusFile; + private File curStatusFile; + private File preValueFile; + private File curValueFile; + + private long vertexCount; + private long edgeCount; + + private BufferedFileOutput curStatusOut; + private BufferedFileOutput curValueOut; + private BufferedFileInput preStatusIn; + private BufferedFileInput preValueIn; + private VertexInput vertexInput; + private EdgesInput edgesInput; + private MessageInput messageInput; + + public FileGraphPartition(ComputerContext context, + Managers managers, + int partition) { + this.context = context; + this.fileGenerator = managers.get(FileManager.NAME); + this.partition = partition; + this.vertexFile = new File(this.fileGenerator.randomDirectory(VERTEX)); + this.edgeFile = new File(this.fileGenerator.randomDirectory(EDGE)); + this.result = this.context.config().createObject( + ComputerOptions.ALGORITHM_RESULT_CLASS); + this.vertexCount = 0L; + this.edgeCount = 0L; + } + + public PartitionStat init(PeekableIterator vertices, + PeekableIterator edges) { + if (edges == null) { + edges = PeekableIterator.emptyIterator(); + } + try { + createFile(this.vertexFile); + createFile(this.edgeFile); + BufferedFileOutput vertexOut = new BufferedFileOutput( + this.vertexFile); + BufferedFileOutput edgeOut = new BufferedFileOutput(this.edgeFile); + while (vertices.hasNext()) { + KvEntry entry = vertices.next(); + Pointer key = entry.key(); + vertexOut.writeFixedInt(key.bytes().length); + vertexOut.write(key.bytes()); + Pointer value = entry.value(); + vertexOut.writeFixedInt(value.bytes().length); + vertexOut.write(value.bytes()); + this.writeEdges(key, edges, edgeOut); + this.vertexCount++; + } + vertexOut.close(); + edgeOut.close(); + } catch (IOException e) { + throw new ComputerException( + "Failed to init FileGraphPartition '%s'", + e, this.partition); + } + + return new PartitionStat(this.partition, this.vertexCount, + this.edgeCount); + } + + protected PartitionStat compute0(ComputationContext context, + Computation computation) { + long activeVertexCount = 0L; + try { + this.beforeCompute(0); + } catch (IOException e) { + throw new ComputerException( + "Error occurred when beforeCompute at superstep 0", e); + } + while (this.vertexInput.hasNext()) { + Vertex vertex = this.vertexInput.next(); + Edges edges = this.edgesInput.edges(this.vertexInput.idPointer()); + vertex.edges(edges); + computation.compute0(context, vertex); + if (vertex.active()) { + activeVertexCount++; + } + try { + this.saveVertex(vertex); + } catch (IOException e) { + throw new ComputerException( + "Error occurred when saveVertex", e); + } + } + try { + this.afterCompute(0); + } catch (IOException e) { + throw new ComputerException("Error occurred when afterCompute", e); + } + return new PartitionStat(this.partition, this.vertexCount, + this.edgeCount, + this.vertexCount - activeVertexCount, 0L, 0L); + } + + protected PartitionStat compute(ComputationContext context, + Computation computation, + int superstep) { + try { + this.beforeCompute(superstep); + } catch (IOException e) { + throw new ComputerException( + "Error occurred when beforeCompute at superstep %s", + e, superstep); + } + long activeVertexCount = 0L; + while (this.vertexInput.hasNext()) { + Vertex vertex = this.vertexInput.next(); + Iterator messageIt = this.messageInput.iterator( + this.vertexInput.idPointer()); + try { + boolean status = this.preStatusIn.readBoolean(); + this.result.read(this.preValueIn); + vertex.value(this.result); + if (status || messageIt.hasNext()) { + vertex.reactivate(); + } else { + vertex.inactivate(); + } + } catch (IOException e) { + throw new ComputerException("Read status or result failed", e); + } + + /* + * If the vertex is inactive, it's edges will be skipped + * automatically at the next vertex. + */ + if (vertex.active()) { + Edges edges = this.edgesInput.edges( + this.vertexInput.idPointer()); + vertex.edges(edges); + computation.compute(context, vertex, messageIt); + } + + // Computation may change vertex status. + if (vertex.active()) { + activeVertexCount++; + } + try { + this.saveVertex(vertex); + } catch (IOException e) { + throw new ComputerException( + "Error occurred when saveVertex", e); + } + } + try { + this.afterCompute(superstep); + } catch (IOException e) { + throw new ComputerException( + "Error occurred when afterCompute at superstep %s", + e, superstep); + } + return new PartitionStat(this.partition, this.vertexCount, + this.edgeCount, + this.vertexCount - activeVertexCount, 0L, 0L); + } + + public PartitionStat output() { + ComputerOutput output = this.context.config().createObject( + ComputerOptions.OUTPUT_CLASS); + output.init(this.context.config(), this.partition); + try { + this.beforeOutput(); + } catch (IOException e) { + throw new ComputerException("Error occurred when beforeOutput", e); + } + + while (this.vertexInput.hasNext()) { + Vertex vertex = this.vertexInput.next(); + try { + boolean status = this.preStatusIn.readBoolean(); + this.result.read(this.preValueIn); + vertex.value(this.result); + if (status) { + vertex.reactivate(); + } else { + vertex.inactivate(); + } + } catch (IOException e) { + throw new ComputerException("Read status or result failed", e); + } + + Edges edges = this.edgesInput.edges(this.vertexInput.idPointer()); + vertex.edges(edges); + output.write(vertex); + } + + try { + this.afterOutput(); + } catch (IOException e) { + throw new ComputerException("Error occurred when afterOutput", e); + } + output.close(); + return new PartitionStat(this.partition, this.vertexCount, + this.edgeCount); + } + + /** + * Put the messages sent at previous superstep from MessageRecvManager to + * this partition. The messages is null if no messages sent to this + * partition at previous superstep. + */ + public void messages(PeekableIterator messages) { + this.messageInput = new MessageInput<>(this.context, messages); + } + + public int partition() { + return this.partition; + } + + private void saveVertex(Vertex vertex) throws IOException { + this.curStatusOut.writeBoolean(vertex.active()); + Value value = vertex.value(); + E.checkNotNull(value, "Vertex's value can't be null"); + vertex.value().write(this.curValueOut); + } + + private void writeEdges(Pointer id, PeekableIterator edges, + BufferedFileOutput edgeOut) throws IOException { + while (edges.hasNext()) { + KvEntry entry = edges.peek(); + Pointer key = entry.key(); + int status = id.compareTo(key); + if (status < 0) { + return; + } else if (status == 0) { + edges.next(); + edgeOut.writeFixedInt(id.bytes().length); + edgeOut.write(id.bytes()); + + long valuePosition = edgeOut.position(); + edgeOut.writeFixedInt(0); + + this.edgeCount += entry.numSubEntries(); + edgeOut.writeFixedInt((int) entry.numSubEntries()); + EntryIterator subKvIt = EntriesUtil.subKvIterFromEntry(entry); + while (subKvIt.hasNext()) { + KvEntry subEntry = subKvIt.next(); + // Not write sub-key length + edgeOut.write(subEntry.key().bytes()); + // Not write sub-value length + edgeOut.write(subEntry.value().bytes()); + } + long valueLength = edgeOut.position() - valuePosition - + Constants.INT_LEN; + edgeOut.writeFixedInt(valuePosition, (int) valueLength); + } else { // status > 0 + edges.next(); + } + } + } + + private void beforeCompute(int superstep) throws IOException { + this.vertexInput = new VertexInput(this.context, this.vertexFile, + this.vertexCount); + this.edgesInput = new EdgesInput(this.context, this.edgeFile); + // Inputs of vertex, edges, status, and value. + this.vertexInput.init(); + this.edgesInput.init(); + if (superstep != 0) { + this.preStatusFile = this.curStatusFile; + this.preValueFile = this.curValueFile; + this.preStatusIn = new BufferedFileInput(this.preStatusFile); + this.preValueIn = new BufferedFileInput(this.preValueFile); + } + + // Outputs of vertex's status and vertex's value. + String statusPath = this.fileGenerator.randomDirectory( + STATUS, Integer.toString(superstep), + Integer.toString(this.partition)); + String valuePath = this.fileGenerator.randomDirectory( + VALUE, Integer.toString(superstep), + Integer.toString(this.partition)); + this.curStatusFile = new File(statusPath); + this.curValueFile = new File(valuePath); + createFile(this.curStatusFile); + createFile(this.curValueFile); + + this.curStatusOut = new BufferedFileOutput(this.curStatusFile); + this.curValueOut = new BufferedFileOutput(this.curValueFile); + } + + private void afterCompute(int superstep) throws IOException { + this.vertexInput.close(); + this.edgesInput.close(); + if (superstep != 0) { + this.preStatusIn.close(); + this.preValueIn.close(); + this.preStatusFile.delete(); + this.preValueFile.delete(); + } + this.curStatusOut.close(); + this.curValueOut.close(); + } + + private void beforeOutput() throws IOException { + this.vertexInput = new VertexInput(this.context, + this.vertexFile, + this.vertexCount); + this.edgesInput = new EdgesInput(this.context, this.edgeFile); + + this.vertexInput.init(); + this.edgesInput.init(); + + this.preStatusFile = this.curStatusFile; + this.preValueFile = this.curValueFile; + this.preStatusIn = new BufferedFileInput(this.preStatusFile); + this.preValueIn = new BufferedFileInput(this.preValueFile); + } + + private void afterOutput() throws IOException { + this.vertexInput.close(); + this.edgesInput.close(); + this.preStatusIn.close(); + this.preValueIn.close(); + this.preStatusFile.delete(); + this.preValueFile.delete(); + } + + private static void createFile(File file) throws IOException { + file.getParentFile().mkdirs(); + file.createNewFile(); + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java new file mode 100644 index 000000000..1fb69b005 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInput.java @@ -0,0 +1,287 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +import javax.annotation.Nonnull; + +import com.baidu.hugegraph.computer.core.common.ComputerContext; +import com.baidu.hugegraph.computer.core.common.Constants; +import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.config.EdgeFrequency; +import com.baidu.hugegraph.computer.core.graph.GraphFactory; +import com.baidu.hugegraph.computer.core.graph.edge.Edge; +import com.baidu.hugegraph.computer.core.graph.edge.Edges; +import com.baidu.hugegraph.computer.core.graph.properties.Properties; +import com.baidu.hugegraph.computer.core.io.BufferedFileInput; +import com.baidu.hugegraph.computer.core.io.RandomAccessInput; +import com.baidu.hugegraph.computer.core.io.StreamGraphInput; + +public class EdgesInput { + + private RandomAccessInput input; + private final ReusablePointer idPointer; + private final ReusablePointer valuePointer; + private final File edgeFile; + private final GraphFactory graphFactory; + private final int flushThreshold; + private final EdgeFrequency frequency; + + public EdgesInput(ComputerContext context, File edgeFile) { + this.graphFactory = context.graphFactory(); + this.idPointer = new ReusablePointer(); + this.valuePointer = new ReusablePointer(); + this.edgeFile = edgeFile; + this.flushThreshold = context.config().get( + ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX); + this.frequency = context.config().get(ComputerOptions.INPUT_EDGE_FREQ); + } + + public void init() throws IOException { + this.input = new BufferedFileInput(this.edgeFile); + } + + public void close() throws IOException { + this.input.close(); + } + + public Edges edges(ReusablePointer vidPointer) { + try { + while (this.input.available() > 0) { + long startPosition = this.input.position(); + this.idPointer.read(this.input); + int status = vidPointer.compareTo(this.idPointer); + if (status < 0) { // No edges + /* + * The current batch belong to vertex that vertex id is + * bigger than specified id. + */ + this.input.seek(startPosition); + return EmptyEdges.instance(); + } else if (status == 0) { // Has edges + this.valuePointer.read(this.input); + Edges edges = this.readEdges(this.valuePointer.input()); + if (edges.size() < this.flushThreshold) { + return edges; + } else { + return new SuperEdges(vidPointer, edges, startPosition); + } + } else { + /* + * The current batch belong to vertex that vertex id is + * smaller than specified id. + */ + int valueLength = this.input.readFixedInt(); + this.input.skip(valueLength); + } + } + return EmptyEdges.instance(); + } catch (IOException e) { + throw new ComputerException("Can't read from edges input '%s'", + e, this.edgeFile.getAbsoluteFile()); + } + } + + private class SuperEdges implements Edges { + + private final ReusablePointer vid; + private final long startPosition; + private Iterator currentEdgesIter; + private int size; + private int iterationTime; + + SuperEdges(ReusablePointer vid, Edges edges, long startPosition) { + this.vid = vid; + this.startPosition = startPosition; + this.currentEdgesIter = edges.iterator(); + this.size = 0; + this.iterationTime = 0; + } + + @Override + public int size() { + if (this.size == 0) { + this.computeSize(); + } + return this.size; + } + + private void computeSize() { + long currentPosition = EdgesInput.this.input.position(); + try { + EdgesInput.this.input.seek(this.startPosition); + EdgesInput.this.idPointer.read(EdgesInput.this.input); + while (EdgesInput.this.idPointer.compareTo(this.vid) == 0) { + long valueLength = EdgesInput.this.input.readFixedInt(); + this.size += EdgesInput.this.input.readInt(); + long bytesToSkip = valueLength - Constants.INT_LEN; + EdgesInput.this.input.skip(bytesToSkip); + if (EdgesInput.this.input.available() > 0) { + EdgesInput.this.idPointer.read(EdgesInput.this.input); + } else { + break; + } + } + EdgesInput.this.input.seek(currentPosition); + } catch (IOException e) { + throw new ComputerException("Compute size of edges failed", e); + } + } + + @Override + public void add(Edge edge) { + throw new ComputerException( + "Not support adding edges during computing"); + } + + @Override + @Nonnull + public Iterator iterator() { + try { + if (this.iterationTime != 0) { + EdgesInput.this.input.seek(this.startPosition); + } + this.iterationTime++; + } catch (IOException e) { + throw new ComputerException("Can't seek to %s", + e, this.startPosition); + } + return new EdgesIterator(); + + } + + private class EdgesIterator implements Iterator { + + @Override + public boolean hasNext() { + if (currentEdgesIter.hasNext()) { + return true; + } else { + long currentPosition = input.position(); + try { + if (input.available() > 0) { + idPointer.read(input); + if (idPointer.compareTo(vid) == 0) { + valuePointer.read(EdgesInput.this.input); + currentEdgesIter = readEdges( + valuePointer.input()) + .iterator(); + } else { + input.seek(currentPosition); + } + } + } catch (IOException e) { + throw new ComputerException( + "Error occurred when read edges from edges " + + "input '%s' at position %s", e, + edgeFile.getAbsoluteFile(), currentPosition); + } + } + return currentEdgesIter.hasNext(); + } + + @Override + public Edge next() { + return currentEdgesIter.next(); + } + } + } + + // TODO: use one reused Edges instance to read batches for each vertex. + private Edges readEdges(RandomAccessInput in) { + try { + int count = in.readFixedInt(); + Edges edges = this.graphFactory.createEdges(count); + if (this.frequency == EdgeFrequency.SINGLE) { + for (int i = 0; i < count; i++) { + Edge edge = this.graphFactory.createEdge(); + // Only use targetId as subKey, use props as subValue + edge.targetId(StreamGraphInput.readId(in)); + Properties props = this.graphFactory.createProperties(); + props.read(in); + edges.add(edge); + } + } else if (this.frequency == EdgeFrequency.SINGLE_PER_LABEL) { + for (int i = 0; i < count; i++) { + Edge edge = this.graphFactory.createEdge(); + // Use label + targetId as subKey, use props as subValue + edge.label(in.readUTF()); + edge.targetId(StreamGraphInput.readId(in)); + Properties props = this.graphFactory.createProperties(); + props.read(in); + edge.properties(props); + edges.add(edge); + } + } else { + assert this.frequency == EdgeFrequency.MULTIPLE; + for (int i = 0; i < count; i++) { + Edge edge = this.graphFactory.createEdge(); + /* + * Use label + sortValues + targetId as subKey, + * use properties as subValue + */ + edge.label(in.readUTF()); + edge.name(in.readUTF()); + edge.targetId(StreamGraphInput.readId(in)); + Properties props = this.graphFactory.createProperties(); + props.read(in); + edges.add(edge); + } + } + return edges; + } catch (IOException e) { + throw new ComputerException("Failed to read edges from input '%s'", + e, this.edgeFile.getAbsoluteFile()); + } + } + public static class EmptyEdges implements Edges { + + private static final EmptyEdges INSTANCE = new EmptyEdges(); + + private EmptyEdges() { + // pass + } + + public static EmptyEdges instance() { + return INSTANCE; + } + + @Override + public int size() { + return 0; + } + + @Override + public void add(Edge edge) { + throw new ComputerException( + "Not support adding edges during computing"); + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java new file mode 100644 index 000000000..cdab2fee7 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInput.java @@ -0,0 +1,120 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import com.baidu.hugegraph.computer.core.common.ComputerContext; +import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.graph.value.Value; +import com.baidu.hugegraph.computer.core.io.BytesInput; +import com.baidu.hugegraph.computer.core.io.IOFactory; +import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.Pointer; + +public class MessageInput> { + + private final PeekableIterator messages; + private final T value; + + public MessageInput(ComputerContext context, + PeekableIterator messages) { + if (messages == null) { + this.messages = PeekableIterator.emptyIterator(); + } else { + this.messages = messages; + } + + this.value = context.config().createObject( + ComputerOptions.ALGORITHM_MESSAGE_CLASS); + } + + public Iterator iterator(ReusablePointer vidPointer) { + while (this.messages.hasNext()) { + KvEntry entry = this.messages.peek(); + Pointer key = entry.key(); + int status = vidPointer.compareTo(key); + if (status < 0) { + return Collections.emptyIterator(); + } else if (status == 0) { + break; + } else { + this.messages.next(); + } + } + + return new MessageIterator(vidPointer); + } + + private class MessageIterator implements Iterator { + + // It indicates whether the value can be returned to client. + private boolean valueValid; + private ReusablePointer vidPointer; + + private MessageIterator(ReusablePointer vidPointer) { + this.vidPointer = vidPointer; + this.valueValid = false; + } + + @Override + public boolean hasNext() { + if (this.valueValid) { + return true; + } + if (MessageInput.this.messages.hasNext()) { + KvEntry entry = MessageInput.this.messages.peek(); + Pointer key = entry.key(); + int status = this.vidPointer.compareTo(key); + if (status == 0) { + MessageInput.this.messages.next(); + this.valueValid = true; + try { + BytesInput in = IOFactory.createBytesInput( + entry.value().bytes()); + MessageInput.this.value.read(in); + } catch (IOException e) { + throw new ComputerException("Can't read value", e); + } + return true; + } else { + return false; + } + } else { + return false; + } + } + + @Override + public T next() { + if (this.valueValid) { + this.valueValid = false; + return MessageInput.this.value; + } else { + throw new NoSuchElementException(); + } + } + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/ReusablePointer.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/ReusablePointer.java new file mode 100644 index 000000000..923175918 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/ReusablePointer.java @@ -0,0 +1,106 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.IOException; + +import com.baidu.hugegraph.computer.core.common.Constants; +import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.io.IOFactory; +import com.baidu.hugegraph.computer.core.io.RandomAccessInput; +import com.baidu.hugegraph.computer.core.io.RandomAccessOutput; +import com.baidu.hugegraph.computer.core.io.Readable; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.Pointer; +import com.baidu.hugegraph.computer.core.util.BytesUtil; + +public class ReusablePointer implements Pointer, Readable { + + private int length; + private byte[] bytes; + private RandomAccessInput input; + + public ReusablePointer() { + this.bytes = Constants.EMPTY_BYTES; + this.length = 0; + this.input = IOFactory.createBytesInput(this.bytes); + } + + public ReusablePointer(byte[] bytes, int length) { + this.bytes = bytes; + this.length = length; + this.input = IOFactory.createBytesInput(this.bytes); + } + + @Override + public void read(RandomAccessInput in) throws IOException { + this.length = in.readFixedInt(); + if (this.bytes.length < this.length) { + this.bytes = new byte[this.length]; + this.input = IOFactory.createBytesInput(this.bytes); + } + in.readFully(this.bytes, 0, this.length); + } + + @Override + public RandomAccessInput input() { + try { + this.input.seek(0L); + } catch (IOException e) { + throw new ComputerException( + "ResuablePointer can't seek to position 0", e); + } + return this.input; + } + + /** + * Only [0 .. length) of the returned byte array is valid. The extra data + * [length .. bytes.length) is meaningless, may be left by previous pointer. + */ + @Override + public byte[] bytes() { + return this.bytes; + } + + @Override + public void write(RandomAccessOutput output) throws IOException { + output.writeFixedInt(this.length); + output.write(this.bytes(), 0, this.length); + } + + @Override + public long offset() { + return 0L; + } + + @Override + public long length() { + return this.length; + } + + @Override + public int compareTo(Pointer other) { + try { + return BytesUtil.compare(this.bytes(), this.length, + other.bytes(), (int) other.length()); + } catch (IOException e) { + throw new ComputerException(e.getMessage(), e); + } + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/VertexInput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/VertexInput.java new file mode 100644 index 000000000..56c0490dc --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/input/VertexInput.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.File; +import java.io.IOException; + +import com.baidu.hugegraph.computer.core.common.ComputerContext; +import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.graph.properties.Properties; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.computer.core.io.BufferedFileInput; +import com.baidu.hugegraph.computer.core.io.RandomAccessInput; +import com.baidu.hugegraph.computer.core.io.StreamGraphInput; + +public class VertexInput { + + private final long vertexCount; + private long readCount; + private RandomAccessInput input; + private final Vertex vertex; + private final ReusablePointer idPointer; + private final ReusablePointer valuePointer; + private final Properties properties; + private final File vertexFile; + + public VertexInput(ComputerContext context, + File vertexFile, + long vertexCount) { + this.vertexFile = vertexFile; + this.vertexCount = vertexCount; + this.readCount = 0L; + this.vertex = context.graphFactory().createVertex(); + this.idPointer = new ReusablePointer(); + this.valuePointer = new ReusablePointer(); + this.properties = context.graphFactory().createProperties(); + this.readCount = 0; + } + + public void init() throws IOException { + this.input = new BufferedFileInput(this.vertexFile); + } + + public void close() throws IOException { + this.input.close(); + } + + public boolean hasNext() { + return this.readCount < this.vertexCount; + } + + public Vertex next() { + this.readCount++; + try { + this.idPointer.read(this.input); + this.valuePointer.read(this.input); + this.properties.read(this.valuePointer.input()); + this.vertex.id(StreamGraphInput.readId(this.idPointer.input())); + this.vertex.properties(this.properties); + } catch (IOException e) { + throw new ComputerException("Can't read vertex from input '%s'", + e, this.vertexFile.getAbsolutePath()); + } + return this.vertex; + } + + public ReusablePointer idPointer() { + return this.idPointer; + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java index 10741f2d8..b271d4869 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/config/ComputerOptions.java @@ -33,6 +33,7 @@ import com.baidu.hugegraph.computer.core.master.DefaultMasterComputation; import com.baidu.hugegraph.computer.core.network.TransportConf; import com.baidu.hugegraph.computer.core.network.netty.NettyTransportProvider; +import com.baidu.hugegraph.computer.core.output.LogOutput; import com.baidu.hugegraph.config.ConfigConvOption; import com.baidu.hugegraph.config.ConfigListOption; import com.baidu.hugegraph.config.ConfigOption; @@ -165,6 +166,15 @@ public static synchronized ComputerOptions instance() { 4 ); + public static final ConfigOption> OUTPUT_CLASS = + new ConfigOption<>( + "output.output_class", + "The class to output the computation result of each " + + "vertex. Be called after iteration computation.", + disallowEmpty(), + LogOutput.class + ); + public static final ConfigOption OUTPUT_RESULT_NAME = new ConfigOption<>( "output.result_name", diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStat.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStat.java index df7338cf2..a6eacab6f 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStat.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStat.java @@ -25,6 +25,7 @@ import com.baidu.hugegraph.computer.core.io.RandomAccessOutput; import com.baidu.hugegraph.computer.core.io.Readable; import com.baidu.hugegraph.computer.core.io.Writable; +import com.baidu.hugegraph.computer.core.receiver.RecvMessageStat; import com.baidu.hugegraph.computer.core.util.JsonUtil; public class PartitionStat implements Readable, Writable { @@ -102,6 +103,13 @@ public void write(RandomAccessOutput out) throws IOException { out.writeLong(this.messageBytes); } + public void merge(RecvMessageStat recvMessageStat) { + if (recvMessageStat != null) { + this.messageCount += recvMessageStat.messageCount(); + this.messageBytes += recvMessageStat.messageBytes(); + } + } + @Override public boolean equals(Object obj) { if (!(obj instanceof PartitionStat)) { diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileInput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileInput.java index bd556243c..d92127b8f 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileInput.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileInput.java @@ -73,7 +73,7 @@ public void seek(long position) throws IOException { super.seek(position - bufferStart); return; } - if (position >= this.fileLength()) { + if (position > this.fileLength()) { throw new EOFException(String.format( "Can't seek to %s, reach the end of file", position)); diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/LogOutput.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/LogOutput.java new file mode 100644 index 000000000..97885c475 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/output/LogOutput.java @@ -0,0 +1,56 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.output; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.util.Log; + +/** + * LogOutput print the computation result to log file. + * It can't be used on production environment. + * Be used for test or development only. + */ +public class LogOutput implements ComputerOutput { + + private static final Logger LOG = Log.logger(LogOutput.class); + + private int partition; + + @Override + public void init(Config config, int partition) { + this.partition = partition; + LOG.info("Start write back partition {}", this.partition); + } + + @Override + public void write(Vertex vertex) { + LOG.info("id='{}', result='{}'", + vertex.id().toString(), + vertex.value().toString()); + } + + @Override + public void close() { + LOG.info("End write back partition {}", this.partition); + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java index 81d226c7f..972b22f16 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -31,7 +31,6 @@ import com.baidu.hugegraph.computer.core.common.exception.TransportException; import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.Config; -import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat; import com.baidu.hugegraph.computer.core.manager.Manager; import com.baidu.hugegraph.computer.core.network.ConnectionId; import com.baidu.hugegraph.computer.core.network.MessageHandler; @@ -45,7 +44,6 @@ import com.baidu.hugegraph.computer.core.store.FileManager; import com.baidu.hugegraph.computer.core.store.SuperstepFileGenerator; import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry; -import com.baidu.hugegraph.computer.core.worker.WorkerStat; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Log; @@ -115,7 +113,6 @@ public void beforeSuperstep(Config config, int superstep) { @Override public void afterSuperstep(Config config, int superstep) { - // pass } @Override @@ -136,19 +133,6 @@ public void exceptionCaught(TransportException cause, connectionId, cause); } - /** - * Merge vertices and edges in each partition parallel, and get the - * workerStat. Be called at the end of input superstep. - */ - public WorkerStat mergeGraph() { - // TODO: Merge vertices and edges in each partition parallel - PartitionStat stat1 = new PartitionStat(0, 100L, 200L, - 50L, 60L, 70L); - WorkerStat workerStat = new WorkerStat(); - workerStat.add(stat1); - return workerStat; - } - public void waitReceivedAllMessages() { try { boolean status = this.finishMessagesLatch.await( @@ -233,4 +217,11 @@ public Map> messagePartitions() { this.messagePartitions = null; return partitions.iterators(); } + + public Map recvMessageStats() { + this.waitReceivedAllMessages(); + E.checkState(this.messagePartitions != null, + "The messagePartitions can't be null"); + return this.messagePartitions.recvMessageStats(); + } } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartition.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartition.java index a30b8f885..4496dd33a 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartition.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartition.java @@ -115,6 +115,10 @@ public long totalBytes() { return this.totalBytes; } + public RecvMessageStat recvMessageStat() { + return new RecvMessageStat(0L, this.totalBytes); + } + protected abstract OuterSortFlusher outerSortFlusher(); protected abstract String type(); diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java index a390eeff9..2f4c7b3d8 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/MessageRecvPartitions.java @@ -50,7 +50,7 @@ public MessageRecvPartitions(ComputerContext context, this.partitions = new HashMap<>(); } - public abstract P createPartition(); + protected abstract P createPartition(); public void addBuffer(int partitionId, ManagedBuffer buffer) { P partition = this.partition(partitionId); @@ -82,4 +82,12 @@ public Map> iterators() { } return entries; } + + public Map recvMessageStats() { + Map entries = new HashMap<>(); + for (Map.Entry entry : this.partitions.entrySet()) { + entries.put(entry.getKey(), entry.getValue().recvMessageStat()); + } + return entries; + } } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/RecvMessageStat.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/RecvMessageStat.java new file mode 100644 index 000000000..01bf2f3e0 --- /dev/null +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/receiver/RecvMessageStat.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.receiver; + +/** + * Received message stat for a partition. + */ +public class RecvMessageStat { + + private final long messageCount; + private final long messageBytes; + + public RecvMessageStat(long messageCount, long messageBytes) { + this.messageCount = messageCount; + this.messageBytes = messageBytes; + } + + public long messageCount() { + return this.messageCount; + } + + public long messageBytes() { + return this.messageBytes; + } +} diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorting/SortManager.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorting/SortManager.java index b20e3b228..2a67ec7cb 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorting/SortManager.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorting/SortManager.java @@ -117,7 +117,8 @@ public CompletableFuture sort(MessageType type, throw new ComputerException("Failed to sort buffers of %s " + "message", e, type.name()); } - return ByteBuffer.wrap(output.buffer()); + + return ByteBuffer.wrap(output.buffer(), 0, (int) output.position()); }, this.sortExecutor); } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/hgkvfile/entry/InlinePointer.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/hgkvfile/entry/InlinePointer.java index 916e4c590..a37dd0385 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/hgkvfile/entry/InlinePointer.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/store/hgkvfile/entry/InlinePointer.java @@ -71,7 +71,8 @@ public long length() { @Override public int compareTo(Pointer other) { try { - return BytesUtil.compare(this.bytes(), other.bytes()); + return BytesUtil.compare(this.bytes(), (int) this.length, + other.bytes(), (int) other.length()); } catch (IOException e) { throw new ComputerException(e.getMessage(), e); } diff --git a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerService.java index 3278f5069..c89ec2471 100644 --- a/computer-core/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerService.java +++ b/computer-core/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerService.java @@ -33,12 +33,12 @@ import com.baidu.hugegraph.computer.core.common.ComputerContext; import com.baidu.hugegraph.computer.core.common.Constants; import com.baidu.hugegraph.computer.core.common.ContainerInfo; +import com.baidu.hugegraph.computer.core.compute.ComputeManager; import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.graph.SuperstepStat; import com.baidu.hugegraph.computer.core.graph.edge.Edge; import com.baidu.hugegraph.computer.core.graph.id.Id; -import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat; import com.baidu.hugegraph.computer.core.graph.value.Value; import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; import com.baidu.hugegraph.computer.core.input.WorkerInputManager; @@ -66,6 +66,7 @@ public class WorkerService { private boolean inited; private Config config; private Bsp4Worker bsp4Worker; + private ComputeManager computeManager; private ContainerInfo workerInfo; private Computation computation; @@ -127,6 +128,8 @@ public void init(Config config) { dm.connect(worker.id(), worker.hostname(), worker.dataPort()); } + this.computeManager = new ComputeManager(this.context, this.managers, + this.computation); this.managers.initedAll(this.config); LOG.info("{} WorkerService initialized", this); this.inited = true; @@ -184,7 +187,9 @@ public void execute() { WorkerContext context = new SuperstepContext(superstep, superstepStat); LOG.info("Start computation of superstep {}", superstep); - + if (superstep > 0) { + this.computeManager.takeComputeMessages(); + } /* * Call beforeSuperstep() before all workers compute() called. * @@ -200,14 +205,8 @@ public void execute() { */ this.bsp4Worker.workerStepPrepareDone(superstep); this.bsp4Worker.waitMasterStepPrepareDone(superstep); - - WorkerStat workerStat = this.compute(); - - /* - * Wait for all workers to do compute() - */ - MessageRecvManager messageRecvManager = - this.managers.get(MessageRecvManager.NAME); + WorkerStat workerStat = this.computeManager.compute(context, + superstep); this.bsp4Worker.workerStepComputeDone(superstep); this.bsp4Worker.waitMasterStepComputeDone(superstep); @@ -314,10 +313,7 @@ private SuperstepStat inputstep() { this.bsp4Worker.workerInputDone(); this.bsp4Worker.waitMasterInputDone(); - MessageRecvManager recvManager = - this.managers.get(MessageRecvManager.NAME); - - WorkerStat workerStat = recvManager.mergeGraph(); + WorkerStat workerStat = this.computeManager.input(); this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP, workerStat); @@ -333,28 +329,11 @@ private SuperstepStat inputstep() { * can exit successfully. */ private void outputstep() { - /* - * Write results back parallel - */ - // TODO: output the vertices in partitions parallel + this.computeManager.output(); this.bsp4Worker.workerOutputDone(); LOG.info("{} WorkerService outputstep finished", this); } - /** - * Compute vertices of all partitions parallel in this worker. - * Be called one time for a superstep. - * @return WorkerStat - */ - private WorkerStat compute() { - // TODO: compute partitions parallel and get workerStat - PartitionStat stat1 = new PartitionStat(0, 100L, 200L, - 50L, 60L, 70L); - WorkerStat workerStat = new WorkerStat(); - workerStat.add(stat1); - return workerStat; - } - private class SuperstepContext implements WorkerContext { private final int superstep; diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManagerTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManagerTest.java new file mode 100644 index 000000000..af9d0bba2 --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManagerTest.java @@ -0,0 +1,248 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Random; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.baidu.hugegraph.computer.core.common.Constants; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.config.Null; +import com.baidu.hugegraph.computer.core.graph.edge.Edge; +import com.baidu.hugegraph.computer.core.graph.edge.Edges; +import com.baidu.hugegraph.computer.core.graph.id.Id; +import com.baidu.hugegraph.computer.core.graph.id.LongId; +import com.baidu.hugegraph.computer.core.graph.properties.Properties; +import com.baidu.hugegraph.computer.core.graph.value.IdValueList; +import com.baidu.hugegraph.computer.core.graph.value.IdValueListList; +import com.baidu.hugegraph.computer.core.graph.value.LongValue; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.computer.core.io.BytesOutput; +import com.baidu.hugegraph.computer.core.io.IOFactory; +import com.baidu.hugegraph.computer.core.manager.Managers; +import com.baidu.hugegraph.computer.core.network.ConnectionId; +import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer; +import com.baidu.hugegraph.computer.core.network.message.MessageType; +import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager; +import com.baidu.hugegraph.computer.core.receiver.ReceiverUtil; +import com.baidu.hugegraph.computer.core.sender.MessageSendManager; +import com.baidu.hugegraph.computer.core.sort.sorting.SortManager; +import com.baidu.hugegraph.computer.core.store.FileManager; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutput; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutputImpl; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntryWriter; +import com.baidu.hugegraph.computer.core.worker.Computation; +import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; + +public class ComputeManagerTest extends UnitTestBase { + + private static final Random RANDOM = new Random(1); + + private Config config; + private Managers managers; + private ConnectionId connectionId; + private ComputeManager computeManager; + + @Before + public void setup() { + this.config = UnitTestBase.updateWithRequiredOptions( + ComputerOptions.JOB_ID, "local_001", + ComputerOptions.JOB_WORKERS_COUNT, "1", + ComputerOptions.JOB_PARTITIONS_COUNT, "2", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.WORKER_COMBINER_CLASS, + Null.class.getName(), // Can't combine + ComputerOptions.ALGORITHM_RESULT_CLASS, + IdValueListList.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + IdValueList.class.getName(), + ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", + ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000", + ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "1000", + ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "10", + ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation.class.getName() + ); + + this.managers = new Managers(); + FileManager fileManager = new FileManager(); + this.managers.add(fileManager); + SortManager sortManager = new SortManager(context()); + this.managers.add(sortManager); + + MessageSendManager sendManager = new MessageSendManager( + context(), sortManager, + new MockMessageSender()); + this.managers.add(sendManager); + MessageRecvManager receiveManager = new MessageRecvManager(context(), + fileManager, + sortManager); + this.managers.add(receiveManager); + this.managers.initAll(this.config); + this.connectionId = new ConnectionId(new InetSocketAddress("localhost", + 8081), + 0); + Computation computation = this.config.createObject( + ComputerOptions.WORKER_COMPUTATION_CLASS); + this.computeManager = new ComputeManager(context(), this.managers, + computation); + } + + @After + public void teardown() { + this.managers.closeAll(this.config); + } + + @Test + public void testProcess() throws IOException { + MessageRecvManager receiveManager = this.managers.get( + MessageRecvManager.NAME); + receiveManager.onStarted(this.connectionId); + add200VertexBuffer((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.VERTEX, 0, buffer); + }); + // Partition 1 only has vertex. + add200VertexBuffer((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.VERTEX, 1, buffer); + }); + receiveManager.onFinished(this.connectionId); + receiveManager.onStarted(this.connectionId); + addSingleFreqEdgeBuffer((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.EDGE, 0, buffer); + }); + receiveManager.onFinished(this.connectionId); + this.computeManager.input(); + + // Superstep 0 + receiveManager.beforeSuperstep(this.config, 0); + receiveManager.onStarted(this.connectionId); + addMessages((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.MSG, 0, buffer); + }); + receiveManager.onFinished(this.connectionId); + this.computeManager.compute(null, 0); + receiveManager.afterSuperstep(this.config, 0); + + // Superstep 1 + this.computeManager.takeComputeMessages(); + receiveManager.beforeSuperstep(this.config, 1); + receiveManager.onStarted(this.connectionId); + receiveManager.onFinished(this.connectionId); + this.computeManager.compute(null, 1); + receiveManager.afterSuperstep(this.config, 1); + + // Output + this.computeManager.output(); + } + + private static void add200VertexBuffer(Consumer consumer) + throws IOException { + for (long i = 0L; i < 200L; i += 2) { + Vertex vertex = graphFactory().createVertex(); + vertex.id(new LongId(i)); + vertex.properties(graphFactory().createProperties()); + ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer); + } + } + + private static byte[] writeVertex(Vertex vertex) throws IOException { + BytesOutput bytesOutput = IOFactory.createBytesOutput( + Constants.SMALL_BUF_SIZE); + EntryOutput entryOutput = new EntryOutputImpl(bytesOutput); + + entryOutput.writeEntry(out -> { + out.writeByte(vertex.id().type().code()); + vertex.id().write(out); + }, out -> { + vertex.properties().write(out); + }); + + return bytesOutput.toByteArray(); + } + + private static void addSingleFreqEdgeBuffer( + Consumer consumer) throws IOException { + for (long i = 0L; i < 200L; i++) { + Vertex vertex = graphFactory().createVertex(); + vertex.id(new LongId(i)); + int count = RANDOM.nextInt(20); + if (count == 0) { + continue; + } + Edges edges = graphFactory().createEdges(count); + + for (long j = 0; j < count; j++) { + Edge edge = graphFactory().createEdge(); + edge.targetId(new LongId(RANDOM.nextInt(200))); + Properties properties = graphFactory().createProperties(); + properties.put("p1", new LongValue(i)); + edge.properties(properties); + edges.add(edge); + } + vertex.edges(edges); + ReceiverUtil.comsumeBuffer(writeEdges(vertex), consumer); + } + } + + private static byte[] writeEdges(Vertex vertex) throws IOException { + BytesOutput bytesOutput = IOFactory.createBytesOutput( + Constants.SMALL_BUF_SIZE); + EntryOutput entryOutput = new EntryOutputImpl(bytesOutput); + + Id id = vertex.id(); + KvEntryWriter subKvWriter = entryOutput.writeEntry(out -> { + out.writeByte(id.type().code()); + id.write(out); + }); + for (Edge edge : vertex.edges()) { + Id targetId = edge.targetId(); + subKvWriter.writeSubKv(out -> { + out.writeByte(targetId.type().code()); + targetId.write(out); + }, out -> { + edge.properties().write(out); + }); + } + subKvWriter.writeFinish(); + return bytesOutput.toByteArray(); + } + + private static void addMessages(Consumer consumer) + throws IOException { + for (long i = 0L; i < 200L; i++) { + int count = RANDOM.nextInt(5); + for (int j = 0; j < count; j++) { + Id id = new LongId(i); + IdValueList message = new IdValueList(); + message.add(id.idValue()); + ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id, + message), + consumer); + } + } + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeTestSuite.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeTestSuite.java new file mode 100644 index 000000000..7ed1b0a0b --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeTestSuite.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import com.baidu.hugegraph.computer.core.compute.input.EdgesInputTest; +import com.baidu.hugegraph.computer.core.compute.input.MessageInputTest; +import com.baidu.hugegraph.computer.core.compute.input.ResuablePointerTest; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + EdgesInputTest.class, + ResuablePointerTest.class, + MessageInputTest.class, + ComputeManagerTest.class +}) +public class ComputeTestSuite { +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/MockComputation.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/MockComputation.java new file mode 100644 index 000000000..9bb81d4d5 --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/MockComputation.java @@ -0,0 +1,85 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute; + +import java.util.Iterator; +import java.util.Random; + +import org.junit.Assert; + +import com.baidu.hugegraph.computer.core.graph.edge.Edge; +import com.baidu.hugegraph.computer.core.graph.edge.Edges; +import com.baidu.hugegraph.computer.core.graph.value.IdValueList; +import com.baidu.hugegraph.computer.core.graph.value.IdValueListList; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.computer.core.worker.Computation; +import com.baidu.hugegraph.computer.core.worker.ComputationContext; + +public class MockComputation implements Computation { + + private static final String NAME = "MockComputation"; + private static final String CATEGORY = "Mock"; + private static final Random RANDOM = new Random(1001L); + + @Override + public String name() { + return NAME; + } + + @Override + public String category() { + return CATEGORY; + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + IdValueListList value = new IdValueListList(); + vertex.value(value); + Edges edges = vertex.edges(); + checkEdges(edges); + checkEdges(edges); + if (RANDOM.nextInt() % 10 == 0) { + vertex.inactivate(); + } + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator messages) { + IdValueListList value = vertex.value(); + while (messages.hasNext()) { + Assert.assertTrue(messages.hasNext()); + value.add(messages.next().copy()); + } + Assert.assertFalse(messages.hasNext()); + if (RANDOM.nextInt() % 10 == 0) { + vertex.inactivate(); + } + } + + private static void checkEdges(Edges edges) { + int edgeSize = edges.size(); + int edgeIndex = 0; + for (Edge edge : edges) { + edgeIndex++; + } + Assert.assertEquals(edgeSize, edgeIndex); + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/MockMessageSender.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/MockMessageSender.java new file mode 100644 index 000000000..b0845fd9c --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/MockMessageSender.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute; + +import java.util.concurrent.CompletableFuture; + +import com.baidu.hugegraph.computer.core.network.message.MessageType; +import com.baidu.hugegraph.computer.core.sender.MessageSender; +import com.baidu.hugegraph.computer.core.sender.QueuedMessage; + +public class MockMessageSender implements MessageSender { + + @Override + public CompletableFuture send(int workerId, MessageType type) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(null); + return future; + } + + @Override + public void send(int workerId, QueuedMessage message) { + // pass + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInputTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInputTest.java new file mode 100644 index 000000000..d577b4981 --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInputTest.java @@ -0,0 +1,315 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Test; + +import com.baidu.hugegraph.computer.core.common.Constants; +import com.baidu.hugegraph.computer.core.common.exception.ComputerException; +import com.baidu.hugegraph.computer.core.compute.FileGraphPartition; +import com.baidu.hugegraph.computer.core.compute.MockMessageSender; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.config.EdgeFrequency; +import com.baidu.hugegraph.computer.core.config.Null; +import com.baidu.hugegraph.computer.core.graph.edge.Edge; +import com.baidu.hugegraph.computer.core.graph.edge.Edges; +import com.baidu.hugegraph.computer.core.graph.id.Id; +import com.baidu.hugegraph.computer.core.graph.id.LongId; +import com.baidu.hugegraph.computer.core.graph.properties.Properties; +import com.baidu.hugegraph.computer.core.graph.value.IdValueList; +import com.baidu.hugegraph.computer.core.graph.value.IdValueListList; +import com.baidu.hugegraph.computer.core.graph.value.LongValue; +import com.baidu.hugegraph.computer.core.graph.vertex.Vertex; +import com.baidu.hugegraph.computer.core.io.BytesOutput; +import com.baidu.hugegraph.computer.core.io.IOFactory; +import com.baidu.hugegraph.computer.core.manager.Managers; +import com.baidu.hugegraph.computer.core.network.ConnectionId; +import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer; +import com.baidu.hugegraph.computer.core.network.message.MessageType; +import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager; +import com.baidu.hugegraph.computer.core.receiver.ReceiverUtil; +import com.baidu.hugegraph.computer.core.sender.MessageSendManager; +import com.baidu.hugegraph.computer.core.sort.sorting.SortManager; +import com.baidu.hugegraph.computer.core.store.FileManager; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutput; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutputImpl; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntryWriter; +import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; +import com.baidu.hugegraph.testutil.Assert; +import com.baidu.hugegraph.testutil.Whitebox; + +public class EdgesInputTest extends UnitTestBase { + + private Config config; + private Managers managers; + + @After + public void teardown() { + if (this.managers != null) { + this.managers.closeAll(this.config); + } + } + + @Test + public void testSingle() throws IOException { + this.testEdgeFreq(EdgeFrequency.SINGLE); + } + + @Test + public void testSinglePerLabel() throws IOException { + this.testEdgeFreq(EdgeFrequency.SINGLE_PER_LABEL); + } + + @Test + public void testMultiple() throws IOException { + this.testEdgeFreq(EdgeFrequency.MULTIPLE); + } + + @Test + public void testEmptyEdges() { + EdgesInput.EmptyEdges edges = EdgesInput.EmptyEdges.instance(); + Iterator it = edges.iterator(); + Assert.assertFalse(it.hasNext()); + Assert.assertEquals(0, edges.size()); + Assert.assertThrows(ComputerException.class, () -> { + edges.add(graphFactory().createEdge()); + }, e -> { + Assert.assertContains("Not support adding edges during computing", + e.getMessage()); + }); + } + + private void testEdgeFreq(EdgeFrequency freq) + throws IOException { + this.config = UnitTestBase.updateWithRequiredOptions( + ComputerOptions.JOB_ID, "local_001", + ComputerOptions.JOB_WORKERS_COUNT, "1", + ComputerOptions.JOB_PARTITIONS_COUNT, "1", + ComputerOptions.WORKER_COMBINER_CLASS, + Null.class.getName(), // Can't combine + ComputerOptions.ALGORITHM_RESULT_CLASS, + IdValueListList.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + IdValueList.class.getName(), + ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", + ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000", + ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "1000", + ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "10", + ComputerOptions.INPUT_EDGE_FREQ, freq.name() + ); + this.managers = new Managers(); + FileManager fileManager = new FileManager(); + this.managers.add(fileManager); + SortManager sortManager = new SortManager(context()); + this.managers.add(sortManager); + + MessageSendManager sendManager = new MessageSendManager( + context(), sortManager, + new MockMessageSender()); + this.managers.add(sendManager); + MessageRecvManager receiveManager = new MessageRecvManager(context(), + fileManager, + sortManager); + this.managers.add(receiveManager); + this.managers.initAll(this.config); + ConnectionId connectionId = new ConnectionId(new InetSocketAddress( + "localhost", 8081), + 0); + FileGraphPartition partition = new FileGraphPartition(context(), + this.managers, + 0); + receiveManager.onStarted(connectionId); + add200VertexBuffer((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.VERTEX, 0, buffer); + }); + receiveManager.onFinished(connectionId); + receiveManager.onStarted(connectionId); + addEdgeBuffer((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.EDGE, 0, buffer); + }, freq); + + receiveManager.onFinished(connectionId); + partition.init(receiveManager.vertexPartitions().get(0), + receiveManager.edgePartitions().get(0)); + File edgeFile = Whitebox.getInternalState(partition, "edgeFile"); + EdgesInput edgesInput = new EdgesInput(context(), edgeFile); + edgesInput.init(); + this.checkEdgesInput(edgesInput, freq); + edgesInput.close(); + } + + private static void add200VertexBuffer(Consumer consumer) + throws IOException { + for (long i = 0L; i < 200L; i += 2) { + Vertex vertex = graphFactory().createVertex(); + vertex.id(new LongId(i)); + vertex.properties(graphFactory().createProperties()); + ReceiverUtil.comsumeBuffer(writeVertex(vertex), consumer); + } + } + + private static byte[] writeVertex(Vertex vertex) throws IOException { + BytesOutput bytesOutput = IOFactory.createBytesOutput( + Constants.SMALL_BUF_SIZE); + EntryOutput entryOutput = new EntryOutputImpl(bytesOutput); + + entryOutput.writeEntry(out -> { + out.writeByte(vertex.id().type().code()); + vertex.id().write(out); + }, out -> { + vertex.properties().write(out); + }); + + return bytesOutput.toByteArray(); + } + + private static void addEdgeBuffer(Consumer consumer, + EdgeFrequency freq) throws IOException { + for (long i = 0L; i < 200L; i++) { + Vertex vertex = graphFactory().createVertex(); + vertex.id(new LongId(i)); + int count = (int) i; + if (count == 0) { + continue; + } + Edges edges = graphFactory().createEdges(count); + + for (long j = 0; j < count; j++) { + Edge edge = graphFactory().createEdge(); + switch (freq) { + case SINGLE: + edge.targetId(new LongId(j)); + break; + case SINGLE_PER_LABEL: + edge.label(String.valueOf(j)); + edge.targetId(new LongId(j)); + break; + case MULTIPLE: + edge.name(String.valueOf(j)); + edge.label(String.valueOf(j)); + edge.targetId(new LongId(j)); + break; + default: + throw new ComputerException( + "Illegal edge frequency %s", freq); + } + + Properties properties = graphFactory().createProperties(); + properties.put("p1", new LongValue(i)); + edge.properties(properties); + edges.add(edge); + } + vertex.edges(edges); + ReceiverUtil.comsumeBuffer(writeEdges(vertex, freq), consumer); + } + } + + private static byte[] writeEdges(Vertex vertex, EdgeFrequency freq) + throws IOException { + BytesOutput bytesOutput = IOFactory.createBytesOutput( + Constants.SMALL_BUF_SIZE); + EntryOutput entryOutput = new EntryOutputImpl(bytesOutput); + + Id id = vertex.id(); + KvEntryWriter subKvWriter = entryOutput.writeEntry(out -> { + out.writeByte(id.type().code()); + id.write(out); + }); + for (Edge edge : vertex.edges()) { + Id targetId = edge.targetId(); + subKvWriter.writeSubKv(out -> { + switch (freq) { + case SINGLE: + out.writeByte(targetId.type().code()); + targetId.write(out); + break; + case SINGLE_PER_LABEL: + out.writeUTF(edge.label()); + out.writeByte(targetId.type().code()); + targetId.write(out); + break; + case MULTIPLE: + out.writeUTF(edge.label()); + out.writeUTF(edge.name()); + out.writeByte(targetId.type().code()); + targetId.write(out); + break; + default: + throw new ComputerException( + "Illegal edge frequency %s", freq); + } + }, out -> { + edge.properties().write(out); + }); + } + subKvWriter.writeFinish(); + return bytesOutput.toByteArray(); + } + + private void checkEdgesInput(EdgesInput edgesInput, EdgeFrequency freq) + throws IOException { + + for (long i = 0L; i < 200L; i += 2) { + LongId id = new LongId(i); + ReusablePointer idPointer = idToReusablePointer(id); + Edges edges = edgesInput.edges(idPointer); + Iterator edgesIt = edges.iterator(); + Assert.assertEquals(i, edges.size()); + for (int j = 0; j < edges.size(); j++) { + Assert.assertTrue(edgesIt.hasNext()); + Edge edge = edgesIt.next(); + switch (freq) { + case SINGLE: + Assert.assertEquals(new LongId(j), edge.targetId()); + break; + case SINGLE_PER_LABEL: + Assert.assertEquals(new LongId(j), edge.targetId()); + Assert.assertEquals(String.valueOf(j), edge.label()); + break; + case MULTIPLE: + Assert.assertEquals(new LongId(j), edge.targetId()); + Assert.assertEquals(String.valueOf(j), edge.label()); + Assert.assertEquals(String.valueOf(j), edge.name()); + break; + default: + throw new ComputerException( + "Illegal edge frequency %s", freq); + } + } + Assert.assertFalse(edgesIt.hasNext()); + } + } + + public static ReusablePointer idToReusablePointer(Id id) + throws IOException { + BytesOutput output = IOFactory.createBytesOutput(9); + output.writeByte(id.type().code()); + id.write(output); + return new ReusablePointer(output.buffer(), (int) output.position()); + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInputTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInputTest.java new file mode 100644 index 000000000..b7527877e --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/MessageInputTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.function.Consumer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.baidu.hugegraph.computer.core.compute.MockComputation; +import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.config.Null; +import com.baidu.hugegraph.computer.core.graph.id.Id; +import com.baidu.hugegraph.computer.core.graph.id.LongId; +import com.baidu.hugegraph.computer.core.graph.value.IdValueList; +import com.baidu.hugegraph.computer.core.graph.value.IdValueListList; +import com.baidu.hugegraph.computer.core.manager.Managers; +import com.baidu.hugegraph.computer.core.network.ConnectionId; +import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer; +import com.baidu.hugegraph.computer.core.network.message.MessageType; +import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager; +import com.baidu.hugegraph.computer.core.receiver.ReceiverUtil; +import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator; +import com.baidu.hugegraph.computer.core.sort.sorting.SortManager; +import com.baidu.hugegraph.computer.core.store.FileManager; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry; +import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; +import com.baidu.hugegraph.testutil.Assert; + +public class MessageInputTest extends UnitTestBase { + + private Config config; + private Managers managers; + private ConnectionId connectionId; + + @Before + public void setup() { + this.config = UnitTestBase.updateWithRequiredOptions( + ComputerOptions.JOB_ID, "local_001", + ComputerOptions.JOB_WORKERS_COUNT, "1", + ComputerOptions.JOB_PARTITIONS_COUNT, "2", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.WORKER_COMBINER_CLASS, + Null.class.getName(), // Can't combine + ComputerOptions.ALGORITHM_RESULT_CLASS, + IdValueListList.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + IdValueList.class.getName(), + ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", + ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000", + ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "1000", + ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "10", + ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation.class.getName() + ); + + this.managers = new Managers(); + FileManager fileManager = new FileManager(); + this.managers.add(fileManager); + SortManager sortManager = new SortManager(context()); + this.managers.add(sortManager); + + MessageRecvManager receiveManager = new MessageRecvManager(context(), + fileManager, + sortManager); + this.managers.add(receiveManager); + this.managers.initAll(this.config); + this.connectionId = new ConnectionId(new InetSocketAddress("localhost", + 8081), + 0); + } + + @After + public void teardown() { + this.managers.closeAll(this.config); + } + + @Test + public void testMessageInput() throws IOException { + MessageRecvManager receiveManager = this.managers.get( + MessageRecvManager.NAME); + receiveManager.onStarted(this.connectionId); + + // Superstep 0 + receiveManager.beforeSuperstep(this.config, 0); + receiveManager.onStarted(this.connectionId); + addMessages((ManagedBuffer buffer) -> { + receiveManager.handle(MessageType.MSG, 0, buffer); + }); + receiveManager.onFinished(this.connectionId); + PeekableIterator it = receiveManager.messagePartitions() + .get(0); + MessageInput input = new MessageInput<>(context(), it); + Map> expectedMessages = expectedMessages(); + checkMessages(expectedMessages, input); + } + + private void checkMessages(Map> expectedMessages, + MessageInput input) throws IOException { + for (long i = 0L; i < 200L; i++) { + List messages = expectedMessages.get(new LongId(i)); + LongId id = new LongId(i); + ReusablePointer idPointer = EdgesInputTest.idToReusablePointer(id); + Iterator mit = input.iterator(idPointer); + if (messages == null) { + Assert.assertFalse(mit.hasNext()); + } else { + for (int j = 0; j < messages.size();j++) { + Assert.assertTrue(mit.hasNext()); + Assert.assertTrue(messages.contains(mit.next())); + } + } + } + } + + private static void addMessages(Consumer consumer) + throws IOException { + Random random = new Random(1); + for (long i = 0L; i < 200L; i++) { + int count = random.nextInt(5); + for (int j = 0; j < count; j++) { + Id id = new LongId(random.nextInt(200)); + IdValueList message = new IdValueList(); + message.add(id.idValue()); + ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id, + message), + consumer); + } + } + } + + private static Map> expectedMessages() { + Random random = new Random(1); + Map> globalMessages = new HashMap<>(); + for (long i = 0L; i < 200L; i++) { + int count = random.nextInt(5); + for (int j = 0; j < count; j++) { + Id id = new LongId(random.nextInt(200)); + IdValueList message = new IdValueList(); + message.add(id.idValue()); + List messages = globalMessages.computeIfAbsent( + id, nid -> new ArrayList<>()); + messages.add(message); + } + } + return globalMessages; + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/ResuablePointerTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/ResuablePointerTest.java new file mode 100644 index 000000000..9ee364847 --- /dev/null +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/ResuablePointerTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.computer.core.compute.input; + +import java.io.IOException; + +import org.junit.Test; + +import com.baidu.hugegraph.computer.core.common.Constants; +import com.baidu.hugegraph.computer.core.io.BytesOutput; +import com.baidu.hugegraph.computer.core.io.IOFactory; +import com.baidu.hugegraph.testutil.Assert; + +public class ResuablePointerTest { + + @Test + public void testReadWrite() throws IOException { + BytesOutput output1 = IOFactory.createBytesOutput(100); + long position = output1.position(); + output1.writeFixedInt(0); + output1.writeInt(Integer.MAX_VALUE); + int length = (int) (output1.position() - position - Constants.INT_LEN); + output1.writeFixedInt(position, length); + + ReusablePointer p1 = new ReusablePointer(); + p1.read(IOFactory.createBytesInput(output1.buffer())); + + BytesOutput output2 = IOFactory.createBytesOutput(100); + p1.write(output2); + + ReusablePointer p2 = new ReusablePointer(); + p2.read(IOFactory.createBytesInput(output2.buffer())); + Assert.assertEquals(0, p1.compareTo(p2)); + Assert.assertEquals(p1.length(), p2.length()); + Assert.assertEquals(0L, p1.offset()); + Assert.assertEquals(0L, p2.offset()); + } +} diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStatTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStatTest.java index 7da34256f..ce99bc774 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStatTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/graph/partition/PartitionStatTest.java @@ -23,6 +23,7 @@ import org.junit.Test; +import com.baidu.hugegraph.computer.core.receiver.RecvMessageStat; import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; import com.baidu.hugegraph.testutil.Assert; @@ -55,6 +56,28 @@ public void testReadWrite() throws IOException { UnitTestBase.assertEqualAfterWriteAndRead(stat2, stat2ReadObj); } + @Test + public void testMerge() throws IOException { + PartitionStat stat1 = new PartitionStat(0, 1L, 2L); + stat1.merge(null); + Assert.assertEquals(1L, stat1.vertexCount()); + Assert.assertEquals(2L, stat1.edgeCount()); + Assert.assertEquals(0L, stat1.messageCount()); + Assert.assertEquals(0L, stat1.messageBytes()); + + RecvMessageStat recvMessageStat = new RecvMessageStat(100L, 400L); + stat1.merge(recvMessageStat); + Assert.assertEquals(1L, stat1.vertexCount()); + Assert.assertEquals(2L, stat1.edgeCount()); + Assert.assertEquals(100L, stat1.messageCount()); + Assert.assertEquals(400L, stat1.messageBytes()); + stat1.merge(recvMessageStat); + Assert.assertEquals(1L, stat1.vertexCount()); + Assert.assertEquals(2L, stat1.edgeCount()); + Assert.assertEquals(200L, stat1.messageCount()); + Assert.assertEquals(800L, stat1.messageBytes()); + } + @Test public void testEquals() { PartitionStat stat1 = new PartitionStat(0, 1L, 2L); diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileTest.java index 1590a3d89..30520ffca 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/io/BufferedFileTest.java @@ -247,7 +247,7 @@ public void testInputSeekOutRange() throws IOException { Assert.assertEquals(1, input.readInt()); input.skip(4); Assert.assertThrows(EOFException.class, () -> { - input.seek(12); // Out of range + input.seek(13); // Out of range }, e -> { Assert.assertContains("reach the end of file", e.getMessage()); diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/ReceiverUtil.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/ReceiverUtil.java index d5264db13..3e9665f29 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/ReceiverUtil.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/ReceiverUtil.java @@ -22,12 +22,18 @@ import java.io.IOException; import java.util.function.Consumer; +import com.baidu.hugegraph.computer.core.common.Constants; import com.baidu.hugegraph.computer.core.graph.id.Id; +import com.baidu.hugegraph.computer.core.io.BytesOutput; +import com.baidu.hugegraph.computer.core.io.IOFactory; import com.baidu.hugegraph.computer.core.io.RandomAccessInput; import com.baidu.hugegraph.computer.core.io.Readable; import com.baidu.hugegraph.computer.core.io.StreamGraphInput; +import com.baidu.hugegraph.computer.core.io.Writable; import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer; import com.baidu.hugegraph.computer.core.network.buffer.NettyManagedBuffer; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutput; +import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutputImpl; import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.Pointer; import io.netty.buffer.ByteBuf; @@ -61,4 +67,17 @@ public static void readValue(Pointer pointer, Readable value) value.read(input); input.seek(position); } + + public static byte[] writeMessage(Id id, Writable message) + throws IOException { + BytesOutput bytesOutput = IOFactory.createBytesOutput( + Constants.SMALL_BUF_SIZE); + EntryOutput entryOutput = new EntryOutputImpl(bytesOutput); + + entryOutput.writeEntry(out -> { + out.writeByte(id.type().code()); + id.write(out); + }, message); + return bytesOutput.toByteArray(); + } } diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitionTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitionTest.java index 8ccf928ba..7e046fac1 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitionTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/receiver/message/ComputeMessageRecvPartitionTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import com.baidu.hugegraph.computer.core.combiner.DoubleValueSumCombiner; -import com.baidu.hugegraph.computer.core.common.Constants; import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.Config; import com.baidu.hugegraph.computer.core.config.Null; @@ -35,9 +34,6 @@ import com.baidu.hugegraph.computer.core.graph.id.LongId; import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; import com.baidu.hugegraph.computer.core.graph.value.IdValueList; -import com.baidu.hugegraph.computer.core.io.BytesOutput; -import com.baidu.hugegraph.computer.core.io.IOFactory; -import com.baidu.hugegraph.computer.core.io.Writable; import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer; import com.baidu.hugegraph.computer.core.network.message.MessageType; import com.baidu.hugegraph.computer.core.receiver.ReceiverUtil; @@ -45,8 +41,6 @@ import com.baidu.hugegraph.computer.core.sort.sorting.SortManager; import com.baidu.hugegraph.computer.core.store.FileManager; import com.baidu.hugegraph.computer.core.store.SuperstepFileGenerator; -import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutput; -import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutputImpl; import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry; import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; import com.baidu.hugegraph.testutil.Assert; @@ -128,7 +122,9 @@ public static void addTwentyCombineMessageBuffer( for (int j = 0; j < 2; j++) { Id id = new LongId(i); DoubleValue message = new DoubleValue(i); - ReceiverUtil.comsumeBuffer(writeMessage(id, message), consumer); + ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id, + message), + consumer); } } } @@ -164,24 +160,13 @@ public static void checkTenCombineMessages(PeekableIterator it) Id id = new LongId(i); IdValueList message = new IdValueList(); message.add(id.idValue()); - ReceiverUtil.comsumeBuffer(writeMessage(id, message), consumer); + ReceiverUtil.comsumeBuffer(ReceiverUtil.writeMessage(id, + message), + consumer); } } } - private static byte[] writeMessage(Id id, Writable message) - throws IOException { - BytesOutput bytesOutput = IOFactory.createBytesOutput( - Constants.SMALL_BUF_SIZE); - EntryOutput entryOutput = new EntryOutputImpl(bytesOutput); - - entryOutput.writeEntry(out -> { - out.writeByte(id.type().code()); - id.write(out); - }, message); - return bytesOutput.toByteArray(); - } - private static void checkIdValueListMessages(PeekableIterator it) throws IOException { for (long i = 0L; i < 10L; i++) { diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation.java index 8f0e09010..348e24083 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation.java @@ -56,8 +56,6 @@ public void beforeSuperstep(WorkerContext context) { } else if (context.superstep() == 1) { this.assertStep1Aggregators(context); } - - this.assertStat(context); } @Override @@ -129,11 +127,6 @@ private void assertAggregateValueWithError(WorkerContext context) { }); } - protected void assertStat(WorkerContext context) { - Assert.assertEquals(100L, context.totalVertexCount()); - Assert.assertEquals(200L, context.totalEdgeCount()); - } - protected void createAndRunAggregators(WorkerContext context) { // AGGR_CUSTOM_INT this.aggrCustomInt = context.createAggregator( @@ -403,7 +396,7 @@ public String category() { @Override public void compute0(ComputationContext context, Vertex vertex) { - // pass + vertex.value(new DoubleValue(0.5D)); } @Override diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation2.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation2.java index 7fc34bbc0..1e665b8a8 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation2.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockComputation2.java @@ -27,12 +27,6 @@ public class MockComputation2 extends MockComputation { - @Override - protected void assertStat(WorkerContext context) { - Assert.assertEquals(200L, context.totalVertexCount()); - Assert.assertEquals(400L, context.totalEdgeCount()); - } - @Override protected void assertStep1Aggregators(WorkerContext context) { Assert.assertEquals(new IntValue(10), context.aggregatedValue( diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation.java index cd306261a..79afbc8a7 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation.java @@ -165,8 +165,6 @@ private void registerAggregatorWithError(MasterContext context) { @Override public boolean compute(MasterComputationContext context) { - this.assertStat(context); - if (context.superstep() == 0) { this.assertStep0Aggregators(context); this.updateStep0Aggregators(context); @@ -178,11 +176,11 @@ public boolean compute(MasterComputationContext context) { } protected void assertStat(MasterComputationContext context) { - Assert.assertEquals(100L, context.totalVertexCount()); - Assert.assertEquals(200L, context.totalEdgeCount()); - Assert.assertEquals(50L, context.finishedVertexCount()); - Assert.assertEquals(60L, context.messageCount()); - Assert.assertEquals(70L, context.messageBytes()); + Assert.assertEquals(6L, context.totalVertexCount()); + Assert.assertEquals(5L, context.totalEdgeCount()); + Assert.assertEquals(0L, context.finishedVertexCount()); + Assert.assertEquals(0L, context.messageCount()); + Assert.assertEquals(0L, context.messageBytes()); } protected void assertStep0Aggregators(MasterComputationContext context) { diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation2.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation2.java index 36c22cffe..140c641bc 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation2.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/MockMasterComputation2.java @@ -28,15 +28,6 @@ public class MockMasterComputation2 extends MockMasterComputation { - @Override - protected void assertStat(MasterComputationContext context) { - Assert.assertEquals(200L, context.totalVertexCount()); - Assert.assertEquals(400L, context.totalEdgeCount()); - Assert.assertEquals(100L, context.finishedVertexCount()); - Assert.assertEquals(120L, context.messageCount()); - Assert.assertEquals(140L, context.messageBytes()); - } - @Override protected void assertStep0Aggregators(MasterComputationContext context) { Assert.assertEquals(new IntValue(10), context.aggregatedValue( diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java index ad402e2d0..a216ca0ca 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/core/worker/WorkerServiceTest.java @@ -30,6 +30,7 @@ import com.baidu.hugegraph.computer.core.common.exception.ComputerException; import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.Config; +import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; import com.baidu.hugegraph.computer.core.master.MasterService; import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; import com.baidu.hugegraph.config.RpcOptions; @@ -56,11 +57,15 @@ public void testServiceWith1Worker() throws InterruptedException { ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation.class.getName() + MockComputation.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); WorkerService workerService = new MockWorkerService(); try { - Thread.sleep(2000); + Thread.sleep(2000L); workerService.init(config); workerService.execute(); } catch (Throwable e) { @@ -82,7 +87,11 @@ public void testServiceWith1Worker() throws InterruptedException { ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.MASTER_COMPUTATION_CLASS, - MockMasterComputation.class.getName() + MockMasterComputation.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); MasterService masterService = new MasterService(); try { @@ -92,6 +101,11 @@ public void testServiceWith1Worker() throws InterruptedException { LOG.error("Failed to start master", e); exceptions[1] = e; } finally { + /* + * It must close the service first. The pool will be shutdown + * if count down is executed first, and the server thread in + * master service will not be closed. + */ masterService.close(); countDownLatch.countDown(); } @@ -121,7 +135,11 @@ public void testServiceWith2Workers() throws InterruptedException { ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation2.class.getName() + MockComputation2.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); WorkerService workerService = new MockWorkerService(); try { @@ -148,7 +166,11 @@ public void testServiceWith2Workers() throws InterruptedException { ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation2.class.getName() + MockComputation2.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); WorkerService workerService = new MockWorkerService(); try { @@ -175,7 +197,11 @@ public void testServiceWith2Workers() throws InterruptedException { ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.MASTER_COMPUTATION_CLASS, - MockMasterComputation2.class.getName() + MockMasterComputation2.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); MasterService masterService = new MasterService(); try { diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/MockComputation.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/MockComputation.java index 492e2eded..ee763c6b5 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/MockComputation.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/MockComputation.java @@ -40,12 +40,12 @@ public String category() { @Override public void compute0(ComputationContext context, Vertex vertex) { - + vertex.value(new DoubleValue(0.5D)); } @Override public void compute(ComputationContext context, Vertex vertex, Iterator messages) { - + // pass } } diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java index 20aa4a4ad..50b69c7ee 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/integrate/SenderIntegrateTest.java @@ -31,6 +31,7 @@ import com.baidu.hugegraph.computer.core.common.ComputerContext; import com.baidu.hugegraph.computer.core.common.exception.TransportException; import com.baidu.hugegraph.computer.core.config.ComputerOptions; +import com.baidu.hugegraph.computer.core.graph.value.DoubleValue; import com.baidu.hugegraph.computer.core.manager.Managers; import com.baidu.hugegraph.computer.core.master.MasterService; import com.baidu.hugegraph.computer.core.network.DataClientManager; @@ -63,8 +64,9 @@ public void testOneWorker() throws InterruptedException { Thread masterThread = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_002") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(1) @@ -88,8 +90,9 @@ public void testOneWorker() throws InterruptedException { Thread workerThread = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_002") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(1) @@ -123,8 +126,9 @@ public void testTwoWorkers() throws InterruptedException { Thread masterThread = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_003") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(2) @@ -143,8 +147,9 @@ public void testTwoWorkers() throws InterruptedException { Thread workerThread1 = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_003") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(2) @@ -164,8 +169,9 @@ public void testTwoWorkers() throws InterruptedException { Thread workerThread2 = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_003") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(2) @@ -197,8 +203,9 @@ public void testOneWorkerWithBusyClient() throws InterruptedException { Thread masterThread = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_002") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(1) @@ -222,8 +229,9 @@ public void testOneWorkerWithBusyClient() throws InterruptedException { Thread workerThread = new Thread(() -> { String[] args = OptionsBuilder.newInstance() .withJobId("local_002") - .withValueName("rank") - .withValueType("DOUBLE") + .withResultName("rank") + .withResultClass(DoubleValue.class) + .withMessageClass(DoubleValue.class) .withMaxSuperStep(3) .withComputationClass(COMPUTATION) .withWorkerCount(1) @@ -317,13 +325,21 @@ public OptionsBuilder withJobId(String jobId) { return this; } - public OptionsBuilder withValueName(String name) { - this.options.add(name); + public OptionsBuilder withResultClass(Class clazz) { + this.options.add(ComputerOptions.ALGORITHM_RESULT_CLASS.name()); + this.options.add(clazz.getName()); return this; } - public OptionsBuilder withValueType(String type) { - this.options.add(type); + public OptionsBuilder withMessageClass(Class clazz) { + this.options.add(ComputerOptions.ALGORITHM_MESSAGE_CLASS.name()); + this.options.add(clazz.getName()); + return this; + } + + public OptionsBuilder withResultName(String name) { + this.options.add(ComputerOptions.OUTPUT_RESULT_NAME.name()); + this.options.add(name); return this; } diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java index 5caa8763d..f16deac8c 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/suite/unit/UnitTestSuite.java @@ -28,6 +28,7 @@ import com.baidu.hugegraph.computer.core.bsp.BspTestSuite; import com.baidu.hugegraph.computer.core.combiner.CombinerTestSuite; import com.baidu.hugegraph.computer.core.common.CommonTestSuite; +import com.baidu.hugegraph.computer.core.compute.ComputeTestSuite; import com.baidu.hugegraph.computer.core.config.ComputerOptions; import com.baidu.hugegraph.computer.core.config.ConfigTestSuite; import com.baidu.hugegraph.computer.core.graph.GraphTestSuite; @@ -63,7 +64,8 @@ SorterTestSuite.class, SortingTestSuite.class, SenderTestSuite.class, - ReceiverTestSuite.class + ReceiverTestSuite.class, + ComputeTestSuite.class }) public class UnitTestSuite {