From 71a72c11035e5e0aecf213ad9d6297f799b44f86 Mon Sep 17 00:00:00 2001 From: jwang98052 Date: Fri, 2 Feb 2018 12:33:04 -0800 Subject: [PATCH 1/4] [REEF-1978] Adding Checkpoint handler for IMRU master * Adding IMRUCheckpointHandler to handle task state persistent * Added configuration module for IMRUCheckpointHandler * Update IMRUJobDefination to all client to set checkpoint configuration * Add UpdateTaskStateCodec implementation * Update IMRU examples to set checkpoint config and call the check point handler * Update test cases JIRA: [REEF-1978](https://issues.apache.org/jira/browse/REEF-1978) This closes # --- .../Org.Apache.REEF.IMRU.Examples.csproj | 11 +- .../PipelinedBroadcastAndReduce.cs | 23 +++- ...inedBroadcastAndReduceWithFaultTolerant.cs | 62 ++++++++++- .../UpdateTaskState.cs | 18 +++- .../UpdateTaskStateCodec.cs | 45 ++++++++ .../packages.config | 1 + .../API/IIMRUCheckpointHandler.cs | 50 +++++++++ .../API/IMRUJobDefinition.cs | 13 +++ .../API/IMRUJobDefinitionBuilder.cs | 14 +++ .../CheckpointConfigurationModule.cs | 38 +++++++ .../CheckpointHandler/CheckpointFilePath.cs | 26 +++++ .../IMRUCheckpointHandler.cs | 101 ++++++++++++++++++ .../OnREEF/Client/REEFIMRUClient.cs | 2 + .../OnREEF/Driver/ConfigurationManager.cs | 22 +++- .../OnREEF/Driver/IMRUDriver.cs | 1 + .../SerializedCheckpointConfiguration.cs | 26 +++++ .../Org.Apache.REEF.IMRU.csproj | 7 +- .../IMRU/IMRUBrodcastReduceTestBase.cs | 12 +++ .../IMRU/TestFailMapperEvaluators.cs | 18 +++- 19 files changed, 480 insertions(+), 10 deletions(-) create mode 100644 lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskStateCodec.cs create mode 100644 lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs create mode 100644 lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs create mode 100644 lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs create mode 100644 lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs create mode 100644 lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj index 43d3b24bad..b1fcff3a53 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj @@ -1,4 +1,4 @@ - + + \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs new file mode 100644 index 0000000000..0801fe8dcd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.IMRU.API +{ + /// + /// It is responsible for save and restore ITaskState object with the given ICodec + /// + [DefaultImplementation(typeof(IMRUCheckpointHandler))] + public interface IIMRUCheckpointHandler + { + /// + /// Persistent ITaskState object with the given ICodec. + /// + /// + /// + void Persistent(ITaskState taskState, ICodec codec); + + /// + /// Restore the data and decode it with the given ICodec. + /// + /// + /// + ITaskState Restore(ICodec codec); + + /// + /// Reset persistent state + /// + void Reset(); + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs index e2ffc6d63b..f028daff61 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs @@ -38,6 +38,7 @@ public sealed class IMRUJobDefinition private readonly IConfiguration _mapInputPipelineDataConverterConfiguration; private readonly IConfiguration _partitionedDatasetConfiguration; private readonly IConfiguration _resultHandlerConfiguration; + private readonly IConfiguration _checkpointConfiguration; private readonly IConfiguration _jobCancelSignalConfiguration; private readonly int _numberOfMappers; private readonly int _memoryPerMapper; @@ -66,6 +67,7 @@ public sealed class IMRUJobDefinition /// Configuration of partitioned /// dataset /// Configuration of result handler + /// Configuration of checkpoint /// Per mapper configuration /// Number of mappers /// Per Mapper memory. @@ -88,6 +90,7 @@ internal IMRUJobDefinition( IConfiguration mapInputPipelineDataConverterConfiguration, IConfiguration partitionedDatasetConfiguration, IConfiguration resultHandlerConfiguration, + IConfiguration checkpointConfiguration, IConfiguration jobCancelSignalConfiguration, ISet perMapConfigGeneratorConfig, int numberOfMappers, @@ -119,6 +122,7 @@ internal IMRUJobDefinition( _perMapConfigGeneratorConfig = perMapConfigGeneratorConfig; _invokeGC = invokeGC; _resultHandlerConfiguration = resultHandlerConfiguration; + _checkpointConfiguration = checkpointConfiguration; _jobCancelSignalConfiguration = jobCancelSignalConfiguration; } @@ -287,6 +291,15 @@ internal IConfiguration ResultHandlerConfiguration get { return _resultHandlerConfiguration; } } + /// + /// Configuration of the checkpoint + /// + /// + internal IConfiguration CheckpointConfiguration + { + get { return _checkpointConfiguration; } + } + /// /// Configuration for job cancellation signal implementation /// diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs index f3fe89c4ce..b750c47506 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs @@ -50,6 +50,7 @@ public sealed class IMRUJobDefinitionBuilder private IConfiguration _mapInputPipelineDataConverterConfiguration; private IConfiguration _partitionedDatasetConfiguration; private IConfiguration _resultHandlerConfiguration; + private IConfiguration _checkPointConfiguration; private IConfiguration _jobCancellationConfiguration; private readonly ISet _perMapConfigGeneratorConfig; private bool _invokeGC; @@ -68,6 +69,7 @@ public IMRUJobDefinitionBuilder() _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration; _partitionedDatasetConfiguration = EmptyConfiguration; _resultHandlerConfiguration = EmptyConfiguration; + _checkPointConfiguration = EmptyConfiguration; _memoryPerMapper = 512; _updateTaskMemory = 512; _coresPerMapper = 1; @@ -295,6 +297,17 @@ public IMRUJobDefinitionBuilder SetResultHandlerConfiguration(IConfiguration res return this; } + /// + /// Sets checkpoint Configuration + /// + /// Checkpoint config + /// + public IMRUJobDefinitionBuilder SetCheckpointConfiguration(IConfiguration checkpointConfig) + { + _checkPointConfiguration = checkpointConfig; + return this; + } + /// /// Whether to invoke Garbage Collector after each IMRU iteration /// @@ -368,6 +381,7 @@ public IMRUJobDefinition Build() _mapInputPipelineDataConverterConfiguration, _partitionedDatasetConfiguration, _resultHandlerConfiguration, + _checkPointConfiguration, _jobCancellationConfiguration, _perMapConfigGeneratorConfig, _numberOfMappers, diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs new file mode 100644 index 0000000000..95d66e147f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler +{ + public sealed class CheckpointConfigurationModule : ConfigurationModuleBuilder + { + public static readonly RequiredParameter CheckpointFile = new RequiredParameter(); + + public static readonly RequiredImpl> TaskStateCodec = new RequiredImpl>(); + + public static readonly ConfigurationModule ConfigurationModule = new CheckpointConfigurationModule() + .BindNamedParameter(GenericType.Class, CheckpointFile) + .BindImplementation(GenericType.Class, GenericType.Class) + .BindImplementation(GenericType>.Class, TaskStateCodec) + .Build(); + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs new file mode 100644 index 0000000000..c69fc18fe9 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler +{ + [NamedParameter("Checkpoint file name")] + public class CheckpointFilePath : Name + { + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs new file mode 100644 index 0000000000..192e03e242 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.IO.FileSystem; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler +{ + /// + /// Default implementation of IIMRUCheckpointHandler + /// + public class IMRUCheckpointHandler : IIMRUCheckpointHandler + { + private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUCheckpointHandler)); + + private readonly IFileSystem _fileSystem; + private readonly Uri _checkpointFileUrl; + private readonly string _localFile; + + /// + /// It is for storing and retrieving checkpoint data. + /// + /// The file path where the checkpoint data will be stored. + /// File system to load/upload checkpoint data + [Inject] + private IMRUCheckpointHandler( + [Parameter(typeof(CheckpointFilePath))] string checkpointFilePath, + IFileSystem fileSystem) + { + _fileSystem = fileSystem; + _checkpointFileUrl = _fileSystem.CreateUriForPath(checkpointFilePath); + _localFile = "local" + Guid.NewGuid(); + Logger.Log(Level.Info, "############ state file path: {0}, localFile: {1}", checkpointFilePath, _localFile); + } + + /// + /// Save serialized checkpoint data to remote checkpoint file. + /// + /// + /// + public void Persistent(ITaskState taskState, ICodec codec) + { + var data = codec.Encode(taskState); + File.WriteAllBytes(_localFile, data); + + if (_fileSystem.Exists(_checkpointFileUrl)) + { + _fileSystem.Delete(_checkpointFileUrl); + } + + _fileSystem.CopyFromLocal(_localFile, _checkpointFileUrl); + } + + /// + /// Read checkpoint data and deserialize it into ITaskState object. + /// + /// + /// + public ITaskState Restore(ICodec codec) + { + if (_fileSystem.Exists(_checkpointFileUrl)) + { + _fileSystem.CopyToLocal(_checkpointFileUrl, _localFile); + var currentState = File.ReadAllBytes(_localFile); + return codec.Decode(currentState); + } + return null; + } + + /// + /// Delete checkpoint file if it exists. It should be only called once at begining of task initialization. + /// + public void Reset() + { + if (_fileSystem.Exists(_checkpointFileUrl)) + { + _fileSystem.Delete(_checkpointFileUrl); + } + } + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs index 44e3f55c4e..9b7834ed03 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs @@ -144,6 +144,8 @@ IEnumerable IIMRUClient.Submit @@ -234,5 +246,13 @@ internal IConfiguration ResultHandlerConfiguration { get { return _resultHandlerConfiguration; } } + + /// + /// Configuration of checkpoint + /// + internal IConfiguration CheckpointConfiguration + { + get { return _checkpointConfiguration; } + } } } \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 3db1bcd4c1..674fbaa1cf 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -1054,6 +1054,7 @@ private IConfiguration GetMasterTaskConfiguration(string taskId) .Build(), _configurationManager.UpdateFunctionConfiguration, _configurationManager.ResultHandlerConfiguration, + _configurationManager.CheckpointConfiguration, GetGroupCommConfiguration()) .BindNamedParameter(GenericType.Class, _invokeGC.ToString()) .Build(); diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs new file mode 100644 index 0000000000..e06363306c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.OnREEF.Parameters +{ + [NamedParameter("The serialized configuration for checkpoint.")] + internal sealed class SerializedCheckpointConfiguration : Name + { + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 99d2665006..58a8276dc6 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -1,4 +1,4 @@ - + + diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs index d60b4c7d83..6895a950bc 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs @@ -41,20 +41,5 @@ public interface IIMRUCheckpointHandler /// /// ITaskState Restore(ICodec codec); - - /// - /// Set a mark to indicate the result has been handled. - /// - void SetResult(); - - /// - /// Check if the result has been handled. - /// - bool GetResult(); - - /// - /// Reset persistent state - /// - void Reset(); } } \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs new file mode 100644 index 0000000000..0a0bb8da3a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.API +{ + /// + /// Internal interface that defineds methods for checkpoint. + /// + [DefaultImplementation(typeof(IMRUCheckpointResultHandler))] + internal interface IIMRUCheckpointResultHandler + { + /// + /// Set a flag to indicate the result has been handled. + /// + void SetResult(); + + /// + /// Check if the result flag has been set. + /// + bool GetResult(); + + /// + /// Clear state files + /// + void Clear(); + } +} diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs index dddf3dc80f..c813068e4f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs @@ -17,6 +17,7 @@ using System; using System.IO; +using System.Linq; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.IO.FileSystem; @@ -34,12 +35,12 @@ public class IMRUCheckpointHandler : IIMRUCheckpointHandler private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUCheckpointHandler)); private readonly IFileSystem _fileSystem; - private readonly Uri _checkpointFileUrl; - private readonly string _localFile; - private readonly Uri _resultFileUrl; - private readonly string _resultLocalFile; private readonly string _checkpointFilePath; - private const string Done = "done"; + private const string StateDir = "StateDir"; + private const string StateFile = "StateFile"; + private const string FlagFile = "FlagFile"; + public const string StateFileExt = ".bin"; + public const string FlagFileExt = ".txt"; /// /// It is for storing and retrieving checkpoint data. @@ -53,15 +54,8 @@ private IMRUCheckpointHandler( { _fileSystem = fileSystem; _checkpointFilePath = checkpointFilePath; - _localFile = "local" + Guid.NewGuid(); - _resultLocalFile = "local" + Guid.NewGuid(); - if (!string.IsNullOrEmpty(_checkpointFilePath)) - { - _checkpointFileUrl = _fileSystem.CreateUriForPath(checkpointFilePath); - _resultFileUrl = _fileSystem.CreateUriForPath(checkpointFilePath + "result"); - } - Logger.Log(Level.Info, "State file path: {0}, localFile: {1}", checkpointFilePath, _localFile); + Logger.Log(Level.Info, "State file path: {0}", checkpointFilePath); } /// @@ -71,18 +65,26 @@ private IMRUCheckpointHandler( /// public void Persistent(ITaskState taskState, ICodec codec) { + var localStateFile = Path.GetTempPath() + Guid.NewGuid().ToString("N").Substring(0, 4); + var localFlagfile = Path.GetTempPath() + Guid.NewGuid().ToString("N").Substring(0, 4); + + string tick = DateTime.Now.Ticks.ToString(); + string stateFileDir = Path.Combine(_checkpointFilePath, StateDir + tick); + string remoteStateFileName = Path.Combine(stateFileDir, StateFile + tick + StateFileExt); + string remoteFlagFileName = Path.Combine(stateFileDir, FlagFile + tick + FlagFileExt); + + var stateFileUri = _fileSystem.CreateUriForPath(remoteStateFileName); + var flagFileUri = _fileSystem.CreateUriForPath(remoteFlagFileName); + var data = codec.Encode(taskState); - File.WriteAllBytes(_localFile, data); + File.WriteAllBytes(localStateFile, data); + File.WriteAllText(localFlagfile, remoteStateFileName); - if (!string.IsNullOrEmpty(_checkpointFilePath)) - { - if (_fileSystem.Exists(_checkpointFileUrl)) - { - _fileSystem.Delete(_checkpointFileUrl); - } + _fileSystem.CopyFromLocal(localStateFile, stateFileUri); + _fileSystem.CopyFromLocal(localFlagfile, flagFileUri); - _fileSystem.CopyFromLocal(_localFile, _checkpointFileUrl); - } + File.Delete(localStateFile); + File.Delete(localFlagfile); } /// @@ -92,51 +94,35 @@ public void Persistent(ITaskState taskState, ICodec codec) /// public ITaskState Restore(ICodec codec) { - if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_checkpointFileUrl)) + if (!string.IsNullOrEmpty(_checkpointFilePath)) { - _fileSystem.CopyToLocal(_checkpointFileUrl, _localFile); - var currentState = File.ReadAllBytes(_localFile); - return codec.Decode(currentState); - } - return null; - } + var files = _fileSystem.GetChildren(_fileSystem.CreateUriForPath(_checkpointFilePath)); + if (files != null) + { + var flagFiles = files.Where(f => f.AbsolutePath.Contains(FlagFile)); + var uris = flagFiles.OrderByDescending(ff => _fileSystem.GetFileStatus(ff).ModificationTime); - public void SetResult() - { - Logger.Log(Level.Info, "SetResult to file {0}", _resultFileUrl); + Uri latestFlagFile = uris.FirstOrDefault(); + if (latestFlagFile != null) + { + var localLatestFlagfile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4)); + var localLatestStatefile = Path.Combine(Path.GetTempPath() + Guid.NewGuid().ToString("N").Substring(0, 4)); - if (!string.IsNullOrEmpty(_checkpointFilePath) && !_fileSystem.Exists(_resultFileUrl)) - { - File.WriteAllText(_resultLocalFile, Done); - _fileSystem.CopyFromLocal(_resultLocalFile, _resultFileUrl); - } - } + _fileSystem.CopyToLocal(latestFlagFile, localLatestFlagfile); + string latestStateFile = File.ReadAllText(localLatestFlagfile); + Logger.Log(Level.Info, "latestStateFile -- : {0}", latestStateFile); + var latestStateFileUri = _fileSystem.CreateUriForPath(latestStateFile); + _fileSystem.CopyToLocal(latestStateFileUri, localLatestStatefile); + var currentState = File.ReadAllBytes(localLatestStatefile); - public bool GetResult() - { - if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_resultFileUrl)) - { - _fileSystem.CopyToLocal(_resultFileUrl, _resultLocalFile); - var result = File.ReadAllText(_resultLocalFile); - Logger.Log(Level.Info, "GetResult: {0}", result); - return Done.Equals(result); - } - return false; - } + File.Delete(localLatestFlagfile); + File.Delete(localLatestStatefile); - /// - /// Delete checkpoint file if it exists. It should be only called once at begining of task initialization. - /// - public void Reset() - { - if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_checkpointFileUrl)) - { - _fileSystem.Delete(_checkpointFileUrl); - } - if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_resultFileUrl)) - { - _fileSystem.Delete(_resultFileUrl); + return codec.Decode(currentState); + } + } } + return null; } } } \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs new file mode 100644 index 0000000000..96592a7b9e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Linq; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IO.FileSystem; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler +{ + /// + /// Internal class that handles checkpoint for result flag. + /// + internal class IMRUCheckpointResultHandler : IIMRUCheckpointResultHandler + { + private static readonly Logger Logger = Logger.GetLogger(typeof(IIMRUCheckpointResultHandler)); + + private const string RresultDir = "RresultDir"; + private const string RresultFile = "result.txt"; + private const string Done = "done"; + + private readonly IFileSystem _fileSystem; + private readonly Uri _resultFileUrl; + private readonly string _checkpointFilePath; + + /// + /// It is for storing and retrieving checkpoint result flag. + /// + /// The file path where the checkpoint data will be stored. + /// File system to load/upload checkpoint data + [Inject] + private IMRUCheckpointResultHandler( + [Parameter(typeof(CheckpointFilePath))] string checkpointFilePath, + IFileSystem fileSystem) + { + _fileSystem = fileSystem; + _checkpointFilePath = checkpointFilePath; + + if (!string.IsNullOrEmpty(_checkpointFilePath)) + { + string resultFile = Path.Combine(_checkpointFilePath, RresultDir, RresultFile); + _resultFileUrl = _fileSystem.CreateUriForPath(resultFile); + } + Logger.Log(Level.Info, "############ state file path: {0}", checkpointFilePath); + } + + /// + /// Set flag to show result is already written. + /// + public void SetResult() + { + if (!string.IsNullOrEmpty(_checkpointFilePath) && !_fileSystem.Exists(_resultFileUrl)) + { + var resultLocalFile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4)); + File.WriteAllText(resultLocalFile, Done); + _fileSystem.CopyFromLocal(resultLocalFile, _resultFileUrl); + File.Delete(resultLocalFile); + } + } + + /// + /// Retrieve the result flag. + /// + /// + public bool GetResult() + { + if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_resultFileUrl)) + { + var resultLocalFile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4)); + _fileSystem.CopyToLocal(_resultFileUrl, resultLocalFile); + var result = File.ReadAllText(resultLocalFile); + Logger.Log(Level.Info, "GetResult: {0}", result); + return Done.Equals(result); + } + return false; + } + + /// + /// Clear checkpoint files in checkpointFilePath. It should be only called once for the entire job. + /// + public void Clear() + { + if (!string.IsNullOrEmpty(_checkpointFilePath)) + { + var filesToRemove = _fileSystem.GetChildren(_fileSystem.CreateUriForPath(_checkpointFilePath)); + var streamToRemove = filesToRemove.Where(f => IsStateFile(f.AbsolutePath)); + foreach (var stream in streamToRemove) + { + _fileSystem.Delete(stream); + } + } + } + + private bool IsStateFile(string path) + { + return path.EndsWith(IMRUCheckpointHandler.StateFileExt) || path.EndsWith(IMRUCheckpointHandler.FlagFileExt); + } + } +} \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index 146598d762..dbfe131de8 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -46,7 +46,7 @@ internal sealed class UpdateTaskHost : TaskHostB private readonly IBroadcastSender> _dataAndControlMessageSender; private readonly IUpdateFunction _updateTask; private readonly IIMRUResultHandler _resultHandler; - private readonly IIMRUCheckpointHandler _checkpointHandler; + private readonly IIMRUCheckpointResultHandler _checkpointResultHandler; /// /// It indicates if the update task has completed and result has been written. @@ -58,7 +58,7 @@ internal sealed class UpdateTaskHost : TaskHostB /// The UpdateTask hosted in this REEF Task. /// Used to setup the communications. /// Result handler - /// Checkpoint handler + /// Checkpoint handler /// Task close Coordinator /// Whether to call Garbage Collector after each iteration or not /// task id @@ -69,7 +69,7 @@ private UpdateTaskHost( IIMRUResultHandler resultHandler, TaskCloseCoordinator taskCloseCoordinator, [Parameter(typeof(InvokeGC))] bool invokeGc, - IIMRUCheckpointHandler checkpointHandler, + IIMRUCheckpointResultHandler checkpointResultHandler, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) : base(groupCommunicationsClient, taskCloseCoordinator, invokeGc) { @@ -79,7 +79,14 @@ private UpdateTaskHost( _communicationGroupClient.GetBroadcastSender>(IMRUConstants.BroadcastOperatorName); _dataReceiver = _communicationGroupClient.GetReduceReceiver(IMRUConstants.ReduceOperatorName); _resultHandler = resultHandler; - _checkpointHandler = checkpointHandler; + _checkpointResultHandler = checkpointResultHandler; + + var taskIdSplit = taskId.Split('-'); + var retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]); + if (retryIndex == 0) + { + _checkpointResultHandler.Clear(); + } Logger.Log(Level.Info, "$$$$_resultHandler." + _resultHandler.GetType().AssemblyQualifiedName); Logger.Log(Level.Info, "UpdateTaskHost initialized."); } @@ -102,10 +109,10 @@ protected override byte[] TaskBody(byte[] memento) if (updateResult.HasResult) { - if (!_checkpointHandler.GetResult()) + if (!_checkpointResultHandler.GetResult()) { _resultHandler.HandleResult(updateResult.Result); - _checkpointHandler.SetResult(); + _checkpointResultHandler.SetResult(); } _done = true; } @@ -135,7 +142,7 @@ protected override byte[] TaskBody(byte[] memento) { _resultHandler.HandleResult(updateResult.Result); _done = true; - _checkpointHandler.SetResult(); + _checkpointResultHandler.SetResult(); } } catch (Exception e) diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 58a8276dc6..059b88d840 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -48,6 +48,7 @@ under the License. Properties\SharedAssemblyInfo.cs + @@ -65,6 +66,7 @@ under the License. + diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestLocalFileSystem.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestLocalFileSystem.cs index cd174695ba..1a91fa281b 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestLocalFileSystem.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestLocalFileSystem.cs @@ -91,6 +91,53 @@ public void TestCopyFromLocal() Assert.False(File.Exists(sourceFilePath)); } + [Fact] + public void TestCopyFromLocalToNewFolder() + { + var fs = GetFileSystem(); + var sourceFilePath = Path.Combine(Path.GetTempPath(), TempFileName); + MakeLocalTestFile(sourceFilePath); + + var destinationFilePath = Path.Combine(Path.GetTempPath(), "test" + Guid.NewGuid().ToString("N"), TempFileName + ".copy"); + if (File.Exists(destinationFilePath)) + { + File.Delete(destinationFilePath); + } + + var destinationUri = new Uri(destinationFilePath); + fs.CopyFromLocal(sourceFilePath, destinationUri); + TestRemoteFile(fs, destinationUri); + + fs.Delete(destinationUri); + Assert.False(fs.Exists(destinationUri)); + + File.Delete(sourceFilePath); + Assert.False(File.Exists(sourceFilePath)); + } + + [Fact] + public void TestCopyToLocalNewFolder() + { + var fs = GetFileSystem(); + var sourceFilePath = Path.Combine(Path.GetTempPath(), TempFileName); + var sourceUri = new Uri(sourceFilePath); + var destinationFilePath = Path.Combine(Path.GetTempPath(), "Test" + Guid.NewGuid().ToString("N").Substring(0, 4), TempFileName + ".copy"); + if (File.Exists(destinationFilePath)) + { + File.Delete(destinationFilePath); + } + + MakeRemoteTestFile(fs, sourceUri); + fs.CopyToLocal(sourceUri, destinationFilePath); + TestLocalFile(destinationFilePath); + + fs.Delete(sourceUri); + Assert.False(fs.Exists(sourceUri)); + + File.Delete(destinationFilePath); + Assert.False(File.Exists(destinationFilePath)); + } + [Fact] public void TestCopyToLocal() { @@ -157,6 +204,27 @@ public void TestGetChildren() fs.DeleteDirectory(directoryUri); } + [Fact] + public void TestGetChildrenFromNewFolder() + { + var fs = GetFileSystem(); + var path = new Uri(Path.Combine(Path.GetTempPath(), "Test" + Guid.NewGuid().ToString("N").Substring(0, 4))); + var children = fs.GetChildren(path); + Assert.Equal(0, children.Count()); + } + + [Fact] + public void TestGetChildrenFromEmptyFolder() + { + var fs = GetFileSystem(); + var directoryUri = new Uri(Path.Combine(Path.Combine(Path.GetTempPath(), "Test" + Guid.NewGuid().ToString("N").Substring(0, 4)))); + fs.CreateDirectory(directoryUri); + var fileUris = fs.GetChildren(directoryUri).ToList(); + + Assert.Equal(0, fileUris.Count); + fs.DeleteDirectory(directoryUri); + } + private IFileSystem GetFileSystem() { return TangFactory.GetTang() diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs index 69a33ad39e..9d8eeb978c 100644 --- a/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.IO; using System.Linq; using Org.Apache.REEF.Tang.Annotations; @@ -92,11 +93,13 @@ public void Copy(Uri sourceUri, Uri destinationUri) public void CopyToLocal(Uri remoteFileUri, string localName) { + EnsureDirectoryExists(localName); File.Copy(remoteFileUri.LocalPath, localName); } public void CopyFromLocal(string localFileName, Uri remoteFileUri) { + EnsureDirectoryExists(remoteFileUri.LocalPath); File.Copy(localFileName, remoteFileUri.LocalPath); } @@ -113,8 +116,25 @@ public void DeleteDirectory(Uri directoryUri) public IEnumerable GetChildren(Uri directoryUri) { var localPath = Path.GetFullPath(directoryUri.LocalPath); - return Directory.GetFileSystemEntries(localPath) - .Select(entry => new Uri(Path.Combine(localPath, entry))); + try + { + return Directory.GetFileSystemEntries(localPath, "*", SearchOption.AllDirectories) + .Select(entry => new Uri(Path.Combine(localPath, entry))); + } + catch (DirectoryNotFoundException) + { + return new Collection(); + } + } + + private static void EnsureDirectoryExists(string filePath) + { + var parts = filePath.Split('\\'); + var directory = filePath.Substring(0, filePath.Length - parts[parts.Length - 1].Length); + if (!Directory.Exists(directory)) + { + Directory.CreateDirectory(directory); + } } } } \ No newline at end of file From eeb766a347455c5d6aff99484a1376f167085ff2 Mon Sep 17 00:00:00 2001 From: jwang98052 Date: Tue, 13 Mar 2018 18:21:06 -0700 Subject: [PATCH 4/4] address review comments and handle failure case if state file corrupt --- .../PipelinedBroadcastAndReduce.cs | 2 +- ...inedBroadcastAndReduceWithFaultTolerant.cs | 25 +++--- .../UpdateTaskStateCodec.cs | 17 ++++- .../TestCheckpointHandler.cs | 68 ++++++++++++++--- .../API/IIMRUCheckpointHandler.cs | 8 +- .../API/IIMRUCheckpointResultHandler.cs | 4 +- .../CheckpointConfigurationModule.cs | 21 ++++- .../IMRUCheckpointHandler.cs | 76 ++++++++++++------- .../IMRUCheckpointResultHandler.cs | 7 +- .../OnREEF/Driver/ConfigurationManager.cs | 11 +-- .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 7 +- .../CheckpointFilePath.cs | 6 +- .../Org.Apache.REEF.IMRU.csproj | 2 +- .../FileSystem/Local/LocalFileSystem.cs | 3 +- .../IMRU/TestFailMapperEvaluators.cs | 9 ++- 15 files changed, 176 insertions(+), 90 deletions(-) rename lang/cs/Org.Apache.REEF.IMRU/OnREEF/{CheckpointHandler => Parameters}/CheckpointFilePath.cs (89%) diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs index cfc0d970d0..1fb76a543b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs @@ -31,7 +31,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// /// IMRU program that performs broadcast and reduce /// - public class PipelinedBroadcastAndReduce + internal class PipelinedBroadcastAndReduce { protected readonly IIMRUClient _imruClient; diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs index 20773d944f..479599c114 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.IO.TempFileCreation; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -39,7 +40,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// /// IMRU program that performs broadcast and reduce with fault tolerance. /// - public sealed class PipelinedBroadcastAndReduceWithFaultTolerant : PipelinedBroadcastAndReduce + internal sealed class PipelinedBroadcastAndReduceWithFaultTolerant : PipelinedBroadcastAndReduce { private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastAndReduceWithFaultTolerant)); @@ -118,11 +119,11 @@ protected override IConfiguration BuildUpdateFunctionConfig() /// protected override IConfiguration BuildCheckpointConfig() { - var filePath = Path.Combine(Path.GetTempPath(), Guid.NewGuid() + "state.txt"); + var filePath = TangFactory.GetTang().NewInjector().GetInstance().CreateTempDirectory("statefiles", string.Empty); - return CheckpointConfigurationModule.ConfigurationModule - .Set(CheckpointConfigurationModule.CheckpointFile, filePath) - .Set(CheckpointConfigurationModule.TaskStateCodec, GenericType.Class) + return CheckpointConfigurationBuilder.ConfigurationModule + .Set(CheckpointConfigurationBuilder.CheckpointFilePath, filePath) + .Set(CheckpointConfigurationBuilder.TaskStateCodec, GenericType.Class) .Build(); } @@ -309,7 +310,6 @@ internal sealed class BroadcastSenderReduceReceiverUpdateFunctionFT : IUpdateFun private readonly int _workers; private readonly UpdateTaskState _taskState; private readonly IIMRUCheckpointHandler _stateHandler; - private readonly ICodec _stateCodec; [Inject] private BroadcastSenderReduceReceiverUpdateFunctionFT( @@ -318,7 +318,6 @@ private BroadcastSenderReduceReceiverUpdateFunctionFT( [Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] int numWorkers, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, IIMRUCheckpointHandler stateHandler, - ICodec stateCodec, ITaskState taskState) { _maxIters = maxIters; @@ -329,7 +328,6 @@ private BroadcastSenderReduceReceiverUpdateFunctionFT( _taskState = (UpdateTaskState)taskState; _stateHandler = stateHandler; - _stateCodec = stateCodec; int retryNumber; int.TryParse(taskId[taskId.Length - 1].ToString(), out retryNumber); @@ -431,23 +429,22 @@ private void Restore(int[] d) private void PersistState() { - Logger.Log(Level.Info, "$$$$$$$$$$$ State to save: {0}", _taskState.Input[0]); - Logger.Log(Level.Info, "$$$$$$$$$$$ SaveState:currentState: {0}", JsonConvert.SerializeObject(_taskState)); - - _stateHandler.Persistent(_taskState, _stateCodec); + Logger.Log(Level.Info, "SaveState:currentState: {0}", JsonConvert.SerializeObject(_taskState)); + _stateHandler.Persist(_taskState); } private void RestoreState() { - var obj = (UpdateTaskState)_stateHandler.Restore(_stateCodec); + var obj = (UpdateTaskState)_stateHandler.Restore(); if (obj != null) { Logger.Log(Level.Info, - "$$$$$$$$$$$ restoreState:DeserializeObject: input: {0}, iteration: {1}, result: {2}.", + "RestoreState:DeserializeObject: input: {0}, iteration: {1}, result: {2}.", obj.Input == null ? string.Empty : string.Join(",", obj.Input), obj.Iterations, obj.Result == null ? string.Empty : string.Join(",", obj.Result)); + _taskState.Update(obj); } } diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskStateCodec.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskStateCodec.cs index bb314dfb1f..5a994bfe69 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskStateCodec.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskStateCodec.cs @@ -23,19 +23,32 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce { - public class UpdateTaskStateCodec : ICodec + /// + /// Codec for Update State + /// + internal sealed class UpdateTaskStateCodec : ICodec { [Inject] - UpdateTaskStateCodec() + private UpdateTaskStateCodec() { } + /// + /// Deserialize bytes into ITaskState object. + /// + /// + /// public ITaskState Decode(byte[] data) { var str = ByteUtilities.ByteArraysToString(data); return JsonConvert.DeserializeObject>(str); } + /// + /// Serialize ITaskState in to bytes. + /// + /// + /// public byte[] Encode(ITaskState taskState) { var state = JsonConvert.SerializeObject(taskState); diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestCheckpointHandler.cs index d127799409..a53b383374 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestCheckpointHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestCheckpointHandler.cs @@ -17,11 +17,14 @@ using System; using System.IO; +using System.Linq; using System.Runtime.Serialization; using Newtonsoft.Json; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.IO.FileSystem; +using Org.Apache.REEF.IO.TempFileCreation; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -34,41 +37,86 @@ namespace Org.Apache.REEF.IMRU.Tests { public class TestCheckpointHandler { + /// + /// Test persist and restore. + /// [Fact] public void TestPersistent() { var injector = TangFactory.GetTang().NewInjector(BuildCheckpointConfig()); var checkpointHandler = injector.GetInstance(); var checkpointResultHandler = injector.GetInstance(); - var codec = injector.GetInstance>(); // Test to clean a non existing folder checkpointResultHandler.Clear(); var taskState = (TestTaskState)TangFactory.GetTang().NewInjector(TaskStateConfiguration()).GetInstance(); taskState.Iterations = 5; - checkpointHandler.Persistent(taskState, codec); + checkpointHandler.Persist(taskState); taskState.Iterations = 10; - checkpointHandler.Persistent(taskState, codec); + checkpointHandler.Persist(taskState); - var state = checkpointHandler.Restore(codec); + var state = checkpointHandler.Restore(); var testTaskState = (TestTaskState)state; Assert.Equal(testTaskState.Iterations, 10); - checkpointResultHandler.SetResult(); - var r = checkpointResultHandler.GetResult(); + checkpointResultHandler.MarkResulHandled(); + var r = checkpointResultHandler.IsResultHandled(); Assert.True(r); checkpointResultHandler.Clear(); } + /// + /// Test last state file corrupted during the restore. + /// + [Fact] + public void TestLastFileCorrupted() + { + var filePath = TangFactory.GetTang().NewInjector().GetInstance().CreateTempDirectory("statefiles", string.Empty); + + var config = CheckpointConfigurationBuilder.ConfigurationModule + .Set(CheckpointConfigurationBuilder.CheckpointFilePath, filePath) + .Set(CheckpointConfigurationBuilder.TaskStateCodec, GenericType.Class) + .Build(); + + var injector = TangFactory.GetTang().NewInjector(config); + var checkpointHandler = injector.GetInstance(); + var checkpointResultHandler = injector.GetInstance(); + var fileSystem = injector.GetInstance(); + + // Test to clean a non existing folder + checkpointResultHandler.Clear(); + + // Write two state files + var taskState = (TestTaskState)TangFactory.GetTang().NewInjector(TaskStateConfiguration()).GetInstance(); + taskState.Iterations = 5; + checkpointHandler.Persist(taskState); + taskState.Iterations = 10; + checkpointHandler.Persist(taskState); + + // Delete the last state file + var files = fileSystem.GetChildren(fileSystem.CreateUriForPath(filePath)); + var flagFiles = files.Where(f => f.AbsolutePath.Contains("FlagFile")); + var uris = flagFiles.OrderByDescending(ff => fileSystem.GetFileStatus(ff).ModificationTime).ToList(); + Uri latestFlagFile = uris.FirstOrDefault(); + var localLatestFlagfile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4)); + fileSystem.CopyToLocal(latestFlagFile, localLatestFlagfile); + string latestStateFile = File.ReadAllText(localLatestFlagfile); + fileSystem.Delete(fileSystem.CreateUriForPath(latestStateFile)); + + var state = checkpointHandler.Restore(); + var testTaskState = (TestTaskState)state; + Assert.Equal(testTaskState.Iterations, 5); + } + protected IConfiguration BuildCheckpointConfig() { - var filePath = Path.Combine(Path.GetTempPath(), "teststatepath" + Guid.NewGuid().ToString("N").Substring(0, 4)); + var filePath = TangFactory.GetTang().NewInjector().GetInstance().CreateTempDirectory("statefiles", string.Empty); - return CheckpointConfigurationModule.ConfigurationModule - .Set(CheckpointConfigurationModule.CheckpointFile, filePath) - .Set(CheckpointConfigurationModule.TaskStateCodec, GenericType.Class) + return CheckpointConfigurationBuilder.ConfigurationModule + .Set(CheckpointConfigurationBuilder.CheckpointFilePath, filePath) + .Set(CheckpointConfigurationBuilder.TaskStateCodec, GenericType.Class) .Build(); } diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs index 6895a950bc..3036727cea 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs @@ -23,7 +23,7 @@ namespace Org.Apache.REEF.IMRU.API { /// - /// It is responsible for save and restore ITaskState object with the given ICodec + /// It is responsible for save and restore ITaskState object. /// [DefaultImplementation(typeof(IMRUCheckpointHandler))] public interface IIMRUCheckpointHandler @@ -32,14 +32,12 @@ public interface IIMRUCheckpointHandler /// Persistent ITaskState object with the given ICodec. /// /// - /// - void Persistent(ITaskState taskState, ICodec codec); + void Persist(ITaskState taskState); /// /// Restore the data and decode it with the given ICodec. /// - /// /// - ITaskState Restore(ICodec codec); + ITaskState Restore(); } } \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs index 0a0bb8da3a..b4dcda9a1c 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs @@ -29,12 +29,12 @@ internal interface IIMRUCheckpointResultHandler /// /// Set a flag to indicate the result has been handled. /// - void SetResult(); + void MarkResulHandled(); /// /// Check if the result flag has been set. /// - bool GetResult(); + bool IsResultHandled(); /// /// Clear state files diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs index 95d66e147f..a865b6ba23 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs @@ -17,20 +17,33 @@ using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler { - public sealed class CheckpointConfigurationModule : ConfigurationModuleBuilder + /// + /// Configuration module builder for checkpoint handler + /// + public sealed class CheckpointConfigurationBuilder : ConfigurationModuleBuilder { - public static readonly RequiredParameter CheckpointFile = new RequiredParameter(); + /// + /// The check point sttate file path. In cluster, should be a path in remote storage. + /// + public static readonly RequiredParameter CheckpointFilePath = new RequiredParameter(); + /// + /// Codec that is to encode and decode state object. + /// public static readonly RequiredImpl> TaskStateCodec = new RequiredImpl>(); - public static readonly ConfigurationModule ConfigurationModule = new CheckpointConfigurationModule() - .BindNamedParameter(GenericType.Class, CheckpointFile) + /// + /// Configuration module for checkpoint handler. + /// + public static readonly ConfigurationModule ConfigurationModule = new CheckpointConfigurationBuilder() + .BindNamedParameter(GenericType.Class, CheckpointFilePath) .BindImplementation(GenericType.Class, GenericType.Class) .BindImplementation(GenericType>.Class, TaskStateCodec) .Build(); diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs index c813068e4f..72598a2601 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs @@ -16,12 +16,16 @@ // under the License. using System; +using System.Collections.Generic; using System.IO; using System.Linq; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.IO.FileSystem; +using Org.Apache.REEF.IO.TempFileCreation; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; @@ -30,12 +34,14 @@ namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler /// /// Default implementation of IIMRUCheckpointHandler /// - public class IMRUCheckpointHandler : IIMRUCheckpointHandler + public sealed class IMRUCheckpointHandler : IIMRUCheckpointHandler { private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUCheckpointHandler)); private readonly IFileSystem _fileSystem; + private readonly ICodec _stateCodec; private readonly string _checkpointFilePath; + private readonly Uri _checkpointFileUri; private const string StateDir = "StateDir"; private const string StateFile = "StateFile"; private const string FlagFile = "FlagFile"; @@ -47,14 +53,17 @@ public class IMRUCheckpointHandler : IIMRUCheckpointHandler /// /// The file path where the checkpoint data will be stored. /// File system to load/upload checkpoint data + /// Codec that is used for decoding and encoding for State object. [Inject] private IMRUCheckpointHandler( [Parameter(typeof(CheckpointFilePath))] string checkpointFilePath, + ICodec stateCodec, IFileSystem fileSystem) { _fileSystem = fileSystem; + _stateCodec = stateCodec; _checkpointFilePath = checkpointFilePath; - + _checkpointFileUri = _fileSystem.CreateUriForPath(_checkpointFilePath); Logger.Log(Level.Info, "State file path: {0}", checkpointFilePath); } @@ -62,11 +71,10 @@ private IMRUCheckpointHandler( /// Save serialized checkpoint data to remote checkpoint file. /// /// - /// - public void Persistent(ITaskState taskState, ICodec codec) + public void Persist(ITaskState taskState) { - var localStateFile = Path.GetTempPath() + Guid.NewGuid().ToString("N").Substring(0, 4); - var localFlagfile = Path.GetTempPath() + Guid.NewGuid().ToString("N").Substring(0, 4); + var localStateFile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("statefile", string.Empty); + var localFlagfile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("flagfile", string.Empty); string tick = DateTime.Now.Ticks.ToString(); string stateFileDir = Path.Combine(_checkpointFilePath, StateDir + tick); @@ -76,7 +84,7 @@ public void Persistent(ITaskState taskState, ICodec codec) var stateFileUri = _fileSystem.CreateUriForPath(remoteStateFileName); var flagFileUri = _fileSystem.CreateUriForPath(remoteFlagFileName); - var data = codec.Encode(taskState); + var data = _stateCodec.Encode(taskState); File.WriteAllBytes(localStateFile, data); File.WriteAllText(localFlagfile, remoteStateFileName); @@ -90,36 +98,52 @@ public void Persistent(ITaskState taskState, ICodec codec) /// /// Read checkpoint data and deserialize it into ITaskState object. /// - /// /// - public ITaskState Restore(ICodec codec) + public ITaskState Restore() { if (!string.IsNullOrEmpty(_checkpointFilePath)) { - var files = _fileSystem.GetChildren(_fileSystem.CreateUriForPath(_checkpointFilePath)); + var files = _fileSystem.GetChildren(_checkpointFileUri); if (files != null) { var flagFiles = files.Where(f => f.AbsolutePath.Contains(FlagFile)); - var uris = flagFiles.OrderByDescending(ff => _fileSystem.GetFileStatus(ff).ModificationTime); + var uris = flagFiles.OrderByDescending(ff => _fileSystem.GetFileStatus(ff).ModificationTime).ToList(); - Uri latestFlagFile = uris.FirstOrDefault(); - if (latestFlagFile != null) - { - var localLatestFlagfile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4)); - var localLatestStatefile = Path.Combine(Path.GetTempPath() + Guid.NewGuid().ToString("N").Substring(0, 4)); + return Restore(uris); + } + } + return null; + } - _fileSystem.CopyToLocal(latestFlagFile, localLatestFlagfile); - string latestStateFile = File.ReadAllText(localLatestFlagfile); - Logger.Log(Level.Info, "latestStateFile -- : {0}", latestStateFile); - var latestStateFileUri = _fileSystem.CreateUriForPath(latestStateFile); - _fileSystem.CopyToLocal(latestStateFileUri, localLatestStatefile); - var currentState = File.ReadAllBytes(localLatestStatefile); + private ITaskState Restore(IList uris) + { + Uri latestFlagFile = uris.FirstOrDefault(); + if (latestFlagFile != null) + { + var localLatestStatefile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("statefile", string.Empty); + var localLatestFlagfile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("flagfile", string.Empty); - File.Delete(localLatestFlagfile); - File.Delete(localLatestStatefile); + _fileSystem.CopyToLocal(latestFlagFile, localLatestFlagfile); - return codec.Decode(currentState); - } + try + { + string latestStateFile = File.ReadAllText(localLatestFlagfile); + Logger.Log(Level.Info, "latestStateFile -- : {0}", latestStateFile); + var latestStateFileUri = _fileSystem.CreateUriForPath(latestStateFile); + _fileSystem.CopyToLocal(latestStateFileUri, localLatestStatefile); + var currentState = File.ReadAllBytes(localLatestStatefile); + return _stateCodec.Decode(currentState); + } + catch (Exception e) + { + Logger.Log(Level.Info, "Exception in restoring from state file. Possible file corruption {0}", e); + uris.RemoveAt(0); + return Restore(uris); + } + finally + { + File.Delete(localLatestFlagfile); + File.Delete(localLatestStatefile); } } return null; diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs index 96592a7b9e..c627104e88 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs @@ -19,6 +19,7 @@ using System.IO; using System.Linq; using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.IO.FileSystem; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Logging; @@ -58,13 +59,13 @@ private IMRUCheckpointResultHandler( string resultFile = Path.Combine(_checkpointFilePath, RresultDir, RresultFile); _resultFileUrl = _fileSystem.CreateUriForPath(resultFile); } - Logger.Log(Level.Info, "############ state file path: {0}", checkpointFilePath); + Logger.Log(Level.Info, "State file path: {0}", checkpointFilePath); } /// /// Set flag to show result is already written. /// - public void SetResult() + public void MarkResulHandled() { if (!string.IsNullOrEmpty(_checkpointFilePath) && !_fileSystem.Exists(_resultFileUrl)) { @@ -79,7 +80,7 @@ public void SetResult() /// Retrieve the result flag. /// /// - public bool GetResult() + public bool IsResultHandled() { if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_resultFileUrl)) { diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs index f198271e89..a49a5dcd9f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs @@ -155,15 +155,8 @@ private ConfigurationManager( Exceptions.Throw(e, "Unable to deserialize map input pipeline data converter configuration", Logger); } - try - { - _checkpointConfiguration = - configurationSerializer.FromString(checkpointConfiguration); - } - catch (Exception e) - { - Exceptions.Throw(e, "Unable to deserialize checkpoint configuration", Logger); - } + _checkpointConfiguration = + configurationSerializer.FromString(checkpointConfiguration); } /// diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index dbfe131de8..ec01d7e49a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -87,7 +87,6 @@ private UpdateTaskHost( { _checkpointResultHandler.Clear(); } - Logger.Log(Level.Info, "$$$$_resultHandler." + _resultHandler.GetType().AssemblyQualifiedName); Logger.Log(Level.Info, "UpdateTaskHost initialized."); } @@ -109,10 +108,10 @@ protected override byte[] TaskBody(byte[] memento) if (updateResult.HasResult) { - if (!_checkpointResultHandler.GetResult()) + if (!_checkpointResultHandler.IsResultHandled()) { _resultHandler.HandleResult(updateResult.Result); - _checkpointResultHandler.SetResult(); + _checkpointResultHandler.MarkResulHandled(); } _done = true; } @@ -142,7 +141,7 @@ protected override byte[] TaskBody(byte[] memento) { _resultHandler.HandleResult(updateResult.Result); _done = true; - _checkpointResultHandler.SetResult(); + _checkpointResultHandler.MarkResulHandled(); } } catch (Exception e) diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CheckpointFilePath.cs similarity index 89% rename from lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs rename to lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CheckpointFilePath.cs index 47b4597e35..1c786d34ac 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointFilePath.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CheckpointFilePath.cs @@ -17,10 +17,10 @@ using Org.Apache.REEF.Tang.Annotations; -namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler +namespace Org.Apache.REEF.IMRU.OnREEF.Parameters { [NamedParameter("Checkpoint file name", "CheckpointFilePath", "")] - public class CheckpointFilePath : Name + public sealed class CheckpointFilePath : Name { } -} \ No newline at end of file +} diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 059b88d840..f2f57d3bcd 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -64,7 +64,6 @@ under the License. - @@ -117,6 +116,7 @@ under the License. + diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs index 9d8eeb978c..e69762f132 100644 --- a/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/Local/LocalFileSystem.cs @@ -129,8 +129,7 @@ public IEnumerable GetChildren(Uri directoryUri) private static void EnsureDirectoryExists(string filePath) { - var parts = filePath.Split('\\'); - var directory = filePath.Substring(0, filePath.Length - parts[parts.Length - 1].Length); + var directory = Path.GetDirectoryName(filePath); if (!Directory.Exists(directory)) { Directory.CreateDirectory(directory); diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs index 2121992952..a0c7fc8e50 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs @@ -23,6 +23,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Driver; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.IO.TempFileCreation; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -203,11 +204,11 @@ protected override IConfiguration BuildUpdateFunctionConfigModule() /// protected override IConfiguration BuildCheckpointConfig() { - var filePath = Path.Combine(Path.GetTempPath(), System.Guid.NewGuid() + "state.txt"); + var filePath = TangFactory.GetTang().NewInjector().GetInstance().CreateTempDirectory("statefiles", string.Empty); - return CheckpointConfigurationModule.ConfigurationModule - .Set(CheckpointConfigurationModule.CheckpointFile, filePath) - .Set(CheckpointConfigurationModule.TaskStateCodec, GenericType.Class) + return CheckpointConfigurationBuilder.ConfigurationModule + .Set(CheckpointConfigurationBuilder.CheckpointFilePath, filePath) + .Set(CheckpointConfigurationBuilder.TaskStateCodec, GenericType.Class) .Build(); }