-
Notifications
You must be signed in to change notification settings - Fork 396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix writing of model in the local file system #528
Conversation
Thanks for the contribution! It looks like @ijeri is an internal user so signing the CLA is not required. However, we need to confirm this. |
Codecov Report
@@ Coverage Diff @@
## master #528 +/- ##
==========================================
+ Coverage 82.47% 86.74% +4.26%
==========================================
Files 347 347
Lines 11952 11956 +4
Branches 377 384 +7
==========================================
+ Hits 9858 10371 +513
+ Misses 2094 1585 -509
Continue to review full report at Codecov.
|
val localPath = new Path(modelStagingDir) | ||
val conf = new Configuration() | ||
val localFileSystem = FileSystem.getLocal(conf) | ||
if (shouldOverwrite) localFileSystem.delete(localPath, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to do the overwriting in an atomic transaction? Eg. so we can't get into a state where the old model got deleted, but the new model failed to save?
|
||
val modelJson = toJsonString(raw.toString) | ||
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString) | ||
val out = localFileSystem.create(new Path(jsonPath, "part-00000"), shouldOverwrite) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels weird to need to hardcode the file name here to part-00000
. Is there a way to let the hadoop filesystem determine the naming for us? Would we run into problems if the file is too large and hadoop wants to break it into many part-xxxxx files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought that model reader needs the part-0000
but looks like it does not https://github.com/salesforce/TransmogrifAI/blob/master/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala#L282. So, I will remove it and then it will be just op-model.json
.
We are writing modelJson
to a local file system, zipping it and then moving to remote s3 path. The bug here was that we were trying to write modelJson
by doing
sc.parallelize(Seq(modelJson)).saveAsTextFile(jsonPath)
The above code will only work if the file is of the hadoop file system. So to write to local file system I am using standard file operations. So, there is no breaking it into many part-xxxx files as the modelJson
is not big data.
val localPath = new Path(modelStagingDir) | ||
val conf = new Configuration() | ||
val localFileSystem = FileSystem.getLocal(conf) | ||
if (shouldOverwrite) localFileSystem.delete(localPath, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line is no longer necessary now that we have val out = localFileSystem.create(new Path(jsonPath), shouldOverwrite)
a bit below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should always do localFileSystem.delete(localPath, true)
irrespective of shouldOverwrite
. shouldOverwrite should be applicable to the remote path i.e the passed to saveImpl
fn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Right, they're independent
|
||
val modelJson = toJsonString(raw.toString) | ||
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString) | ||
val out = localFileSystem.create(new Path(jsonPath), shouldOverwrite) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set shouldOverwrite
to true
? If destinationFileSystem.moveFromLocalFile(compressed, finalPath)
below ever fails to delete after copying we'll need manual intervention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole saveImpl
function is called inside save
function which takes cares of if destinationFileSystem.moveFromLocalFile(compressed, finalPath) ever fails to delete.
def save(path: String): Unit = {
new FileSystemOverwrite().handleOverwrite(path, shouldOverwrite, sc)
saveImpl(path)
}
private[ml] class FileSystemOverwrite extends Logging {
def handleOverwrite(path: String, shouldOverwrite: Boolean, sc: SparkContext): Unit = {
val hadoopConf = sc.hadoopConfiguration
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
if (fs.exists(qualifiedOutputPath)) {
if (shouldOverwrite) {
logInfo(s"Path $path already exists. It will be overwritten.")
// TODO: Revert back to the original content if save is not successful.
fs.delete(qualifiedOutputPath, true)
} else {
throw new IOException(s"Path $path already exists. To overwrite it, " +
s"please use write.overwrite().save(path) for Scala and use " +
s"write().overwrite().save(path) for Java and Python.")
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to doing localFileSystem.create(new Path(jsonPath), true)
-- which you already changed -- not the overwriting on the s3 path.
val out = localFileSystem.create(new Path(jsonPath), shouldOverwrite) | ||
val os = new BufferedOutputStream(out) | ||
os.write(modelJson.getBytes("UTF-8")) | ||
os.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure to close even in case of exceptions. Something along the lines of try...finally or https://www.scala-lang.org/api/current/scala/util/Using$.html (2.13 only, though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@ijeri can you give an example of the local path? Do you mean because the path is not absolute? Saving RDD to a local path should work even if the local filesystem is not the default one:
|
+1 to stop gzipping op-model json since we now zip the whole bundle |
@gerashegalov the file path is a qualified one for ex- |
makes sense: after rdd is distributed partitions are written out to the worker node local dirs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some comments
localFileSystem.delete(localPath, true) | ||
val raw = new Path(localPath, WorkflowFileReader.rawModel) | ||
|
||
val modelJson = toJsonString(raw.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DRY: avoid repeated toString
|
||
val modelJson = toJsonString(raw.toString) | ||
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString) | ||
val os = new BufferedOutputStream(localFileSystem.create(new Path(jsonPath))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffering is superfluous. Let use the FSDataOutputStream directly. It's actually backed by the BufferedOutputStream
.
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString) | ||
val os = new BufferedOutputStream(localFileSystem.create(new Path(jsonPath))) | ||
try { | ||
os.write(modelJson.getBytes("UTF-8")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use a constant here StandardCharsets.UTF_8.toString
@@ -232,7 +232,12 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec with PassengerSparkFixtureTest | |||
dirFile.isDirectory shouldBe true | |||
// TODO: maybe do a thorough files inspection here | |||
val files = FileUtils.listFiles(dirFile, null, true) | |||
files.asScala.map(_.toString).exists(_.contains("_SUCCESS")) shouldBe true | |||
if (outFile.getAbsolutePath.endsWith("/model")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe:
val fileNames = files.asScala.map(_.getName)
if (outFile.getAbsolutePath.endsWith("/model")) {
fileNames should contain ("op-model.json")
}
else {
fileNames should contain ("_SUCCESS")
}
@gerashegalov I have addressed your comments. Please take a look again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
7501ab5
to
70ac3f6
Compare
c902bf9
to
f0db8d9
Compare
Thanks for the contribution! Unfortunately we can't verify the commit author(s): Sanmitra Ijeri <s***@s***.com>. One possible solution is to add that email to your GitHub account. Alternatively you can change your commits to another email and force push the change. After getting your commits associated with your GitHub account, refresh the status of this Pull Request. |
replace tuple with case class for misclassification per category fixed ModelSelectorSummaryTest 1) Use binary search impl from Scala 2)increased scoring time in OpWorkflowModelLocalTest 3)renaming confusion matrix calculation params Fix writing of model in the local file system (salesforce#528) test user cla:missing update RecordInisghtsLOCOTest Threshold Fix writing of model in the local file system (salesforce#528) test user cla:missing
replace tuple with case class for misclassification per category fixed ModelSelectorSummaryTest 1) Use binary search impl from Scala 2)increased scoring time in OpWorkflowModelLocalTest 3)renaming confusion matrix calculation params Fix writing of model in the local file system (salesforce#528) test user cla:missing update RecordInisghtsLOCOTest Threshold Fix writing of model in the local file system (salesforce#528) test user cla:missing increased threshold to 1.0 to fix this flaky test Fix writing of model in the local file system (salesforce#528) (1)set the default value of topN classes 10 (2)added confMatrixClassIndices attribute
Related issues
The model loading in a cluster was failing with error - op-model.json/part-00000 did not exist.
Additional context
In #516 we made the changes to save model locally and convert to zip before moving to remote path. So the path passed to
the below method
saveImpl
is of a local file system, hence when we are trying to save it as a HadoopFile, op-model.json/part-00000 is not being written out.