Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-1978] Adding Checkpoint handler for IMRU master #1429

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -34,8 +34,12 @@ under the License.
<BuildPackage>true</BuildPackage>
</PropertyGroup>
<ItemGroup>
<Reference Include="Newtonsoft.Json">
<HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Runtime.Serialization" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
Expand All @@ -52,6 +56,7 @@ under the License.
<Compile Include="PipelinedBroadcastReduce\PipelinedBroadcastAndReduceWithFaultTolerant.cs" />
<Compile Include="PipelinedBroadcastReduce\MapTaskState.cs" />
<Compile Include="PipelinedBroadcastReduce\UpdateTaskState.cs" />
<Compile Include="PipelinedBroadcastReduce\UpdateTaskStateCodec.cs" />
<Compile Include="SingleIterUpdateFunction.cs" />
<Compile Include="NaturalSum\NaturalSum.cs" />
<Compile Include="NaturalSum\NaturalSumMapFunction.cs" />
Expand Down Expand Up @@ -113,7 +118,9 @@ under the License.
<Link>App.config</Link>
</None>
<None Include="Org.Apache.REEF.IMRU.Examples.nuspec" />
<None Include="packages.config" />
<None Include="packages.config">
<SubType>Designer</SubType>
</None>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
using System.Globalization;
using System.IO;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
using Org.Apache.REEF.IO.PartitionedData.Random;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Tang;
Expand All @@ -31,7 +31,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// <summary>
/// IMRU program that performs broadcast and reduce
/// </summary>
public class PipelinedBroadcastAndReduce
internal class PipelinedBroadcastAndReduce
{
protected readonly IIMRUClient _imruClient;

Expand Down Expand Up @@ -62,6 +62,8 @@ protected virtual IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int number
.SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize))
.SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize))
.SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers))
.SetResultHandlerConfiguration(BuildResultHandlerConfig())
.SetCheckpointConfiguration(BuildCheckpointConfig())
.SetJobName("BroadcastReduce")
.SetNumberOfMappers(numberofMappers)
.SetMapperMemory(mapperMemory)
Expand Down Expand Up @@ -174,5 +176,24 @@ protected virtual IConfiguration BuildMapperFunctionConfig()
GenericType<BroadcastReceiverReduceSenderMapFunction>.Class)
.Build();
}

/// <summary>
/// Build checkpoint configuration. Subclass can override it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to use public abstract classes as APIs. Consider re-structuring this using composition instead of inheritance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to follow the existing pattern for getting configurations. It was to let client to share the same CreateJobDefinitionBuilder but have its own way to override the configuration. If we really want to change it, it needs to do in different PR as the change must be consistent cross other methods.

/// </summary>
protected virtual IConfiguration BuildCheckpointConfig()
{
return TangFactory.GetTang().NewConfigurationBuilder()
.Build();
}

/// <summary>
/// Build default result handler configuration. Subclass can override it.
/// </summary>
protected virtual IConfiguration BuildResultHandlerConfig()
{
return TangFactory.GetTang().NewConfigurationBuilder()
.BindImplementation(GenericType<IIMRUResultHandler<int[]>>.Class, GenericType<WriteResultHandler<int[]>>.Class)
.Build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,27 @@
using System.Globalization;
using System.IO;
using System.Linq;
using Newtonsoft.Json;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
using Org.Apache.REEF.IO.TempFileCreation;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote;

namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
{
/// <summary>
/// IMRU program that performs broadcast and reduce with fault tolerance.
/// </summary>
public sealed class PipelinedBroadcastAndReduceWithFaultTolerant : PipelinedBroadcastAndReduce
internal sealed class PipelinedBroadcastAndReduceWithFaultTolerant : PipelinedBroadcastAndReduce
{
private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastAndReduceWithFaultTolerant));

Expand Down Expand Up @@ -69,6 +73,8 @@ protected override IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int numbe
.SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize))
.SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize))
.SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers))
.SetResultHandlerConfiguration(BuildResultHandlerConfig())
.SetCheckpointConfiguration(BuildCheckpointConfig())
.SetJobName("BroadcastReduce")
.SetNumberOfMappers(numberofMappers)
.SetMapperMemory(mapperMemory)
Expand Down Expand Up @@ -108,6 +114,19 @@ protected override IConfiguration BuildUpdateFunctionConfig()
GenericType<BroadcastSenderReduceReceiverUpdateFunctionFT>.Class).Build();
}

/// <summary>
/// Build checkpoint configuration. Subclass can override it.
/// </summary>
protected override IConfiguration BuildCheckpointConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. It is a sample client class, mainly contains driver configurations. I will change the class into internal.

{
var filePath = TangFactory.GetTang().NewInjector().GetInstance<ITempFileCreator>().CreateTempDirectory("statefiles", string.Empty);

return CheckpointConfigurationBuilder.ConfigurationModule
.Set(CheckpointConfigurationBuilder.CheckpointFilePath, filePath)
.Set(CheckpointConfigurationBuilder.TaskStateCodec, GenericType<UpdateTaskStateCodec>.Class)
.Build();
}

/// <summary>
/// Configuration for Update task state
/// </summary>
Expand Down Expand Up @@ -290,12 +309,15 @@ internal sealed class BroadcastSenderReduceReceiverUpdateFunctionFT : IUpdateFun
private readonly int[] _intArr;
private readonly int _workers;
private readonly UpdateTaskState<int[], int[]> _taskState;
private readonly IIMRUCheckpointHandler _stateHandler;

[Inject]
private BroadcastSenderReduceReceiverUpdateFunctionFT(
[Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int maxIters,
[Parameter(typeof(BroadcastReduceConfiguration.Dimensions))] int dim,
[Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] int numWorkers,
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId,
IIMRUCheckpointHandler stateHandler,
ITaskState taskState)
{
_maxIters = maxIters;
Expand All @@ -304,6 +326,11 @@ private BroadcastSenderReduceReceiverUpdateFunctionFT(
_intArr = new int[_dim];
_workers = numWorkers;
_taskState = (UpdateTaskState<int[], int[]>)taskState;

_stateHandler = stateHandler;

int retryNumber;
int.TryParse(taskId[taskId.Length - 1].ToString(), out retryNumber);
}

/// <summary>
Expand Down Expand Up @@ -343,6 +370,8 @@ UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Update(int[] inp
/// <returns>Map input</returns>
UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Initialize()
{
RestoreState();

if (_taskState.Result != null)
{
Restore(_taskState.Result);
Expand Down Expand Up @@ -372,7 +401,7 @@ private void SaveState(int[] value)
{
_taskState.Iterations = _iterations;
_taskState.Input = value;
Logger.Log(Level.Info, "State saved: {0}", _taskState.Input[0]);
PersistState();
}

/// <summary>
Expand All @@ -383,7 +412,7 @@ private void SaveResult(int[] value)
{
_taskState.Iterations = _iterations;
_taskState.Result = value;
Logger.Log(Level.Info, "Result saved: {0}", _taskState.Result[0]);
PersistState();
}

/// <summary>
Expand All @@ -397,6 +426,28 @@ private void Restore(int[] d)
_intArr[i] = d[i];
}
}

private void PersistState()
{
Logger.Log(Level.Info, "SaveState:currentState: {0}", JsonConvert.SerializeObject(_taskState));
_stateHandler.Persist(_taskState);
}

private void RestoreState()
{
var obj = (UpdateTaskState<int[], int[]>)_stateHandler.Restore();

if (obj != null)
{
Logger.Log(Level.Info,
"RestoreState:DeserializeObject: input: {0}, iteration: {1}, result: {2}.",
obj.Input == null ? string.Empty : string.Join(",", obj.Input),
obj.Iterations,
obj.Result == null ? string.Empty : string.Join(",", obj.Result));

_taskState.Update(obj);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

using System.Runtime.Serialization;
using Newtonsoft.Json;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities.Logging;
Expand All @@ -26,25 +28,37 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
/// </summary>
/// <typeparam name="TMapInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
[DataContract]
internal sealed class UpdateTaskState<TMapInput, TResult> : ITaskState
{
private static readonly Logger Logger = Logger.GetLogger(typeof(UpdateTaskState<TMapInput, TResult>));

[DataMember]
internal TMapInput Input { get; set; }

[DataMember]
internal TResult Result { get; set; }

/// <summary>
/// Keep the current iteration number
/// </summary>
internal int Iterations { get; set; }
[DataMember]
internal int Iterations { get; set; }

/// <summary>
/// Simple constructor for UpdateTaskState
/// </summary>
[Inject]
[JsonConstructor]
private UpdateTaskState()
{
}

internal void Update(UpdateTaskState<TMapInput, TResult> taskState)
{
Input = taskState.Input;
Result = taskState.Result;
Iterations = taskState.Iterations;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using Newtonsoft.Json;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Wake.Remote;

namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
{
/// <summary>
/// Codec for Update State
/// </summary>
internal sealed class UpdateTaskStateCodec : ICodec<ITaskState>
{
[Inject]
private UpdateTaskStateCodec()
{
}

/// <summary>
/// Deserialize bytes into ITaskState object.
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public ITaskState Decode(byte[] data)
{
var str = ByteUtilities.ByteArraysToString(data);
return JsonConvert.DeserializeObject<UpdateTaskState<int[], int[]>>(str);
}

/// <summary>
/// Serialize ITaskState in to bytes.
/// </summary>
/// <param name="taskState"></param>
/// <returns></returns>
public byte[] Encode(ITaskState taskState)
{
var state = JsonConvert.SerializeObject(taskState);
return ByteUtilities.StringToByteArrays(state);
}
}
}
1 change: 1 addition & 0 deletions lang/cs/Org.Apache.REEF.IMRU.Examples/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ specific language governing permissions and limitations
under the License.
-->
<packages>
<package id="Newtonsoft.Json" version="10.0.3" targetFramework="net451" />
<package id="StyleCop.MSBuild" version="5.0.0" targetFramework="net45" developmentDependency="true" />
</packages>
2 changes: 2 additions & 0 deletions lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ private IConfiguration GetDriverConfig<TMapInput, TMapOutput, TResult, TPartitio
_configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
.BindNamedParameter(typeof(SerializedResultHandlerConfiguration),
_configurationSerializer.ToString(jobDefinition.ResultHandlerConfiguration))
.BindNamedParameter(typeof(SerializedCheckpointConfiguration),
_configurationSerializer.ToString(jobDefinition.CheckpointConfiguration))
.BindNamedParameter(typeof(MemoryPerMapper),
jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter(typeof(MemoryForUpdateTask),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -35,8 +35,12 @@ under the License.
<RestorePackages>true</RestorePackages>
</PropertyGroup>
<ItemGroup>
<Reference Include="Newtonsoft.Json">
<HintPath>$(PackagesDir)\Newtonsoft.Json.$(NewtonsoftJsonVersion)\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Runtime.Serialization" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
Expand All @@ -59,6 +63,7 @@ under the License.
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="MapperCountTest.cs" />
<Compile Include="TestActiveContextManager.cs" />
<Compile Include="TestCheckpointHandler.cs" />
<Compile Include="TestEvaluatorManager.cs" />
<Compile Include="TestSystemStates.cs" />
<Compile Include="TestTaskManager.cs" />
Expand Down
Loading